diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 429944d329..a604bf5124 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -167,9 +167,7 @@ impl Client { let client = FlightServiceClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Zstd); + .max_encoding_message_size(self.max_grpc_send_message_size()); Ok(FlightClient { addr, client }) } @@ -178,9 +176,7 @@ impl Client { let (addr, channel) = self.find_channel()?; let client = PbRegionClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()) - .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Zstd); + .max_encoding_message_size(self.max_grpc_send_message_size()); Ok((addr, client)) } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 3d4d94dc60..0b83db9cfd 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -64,6 +64,18 @@ impl Default for FlightEncoder { } impl FlightEncoder { + pub fn with_compression_disabled() -> Self { + let write_options = writer::IpcWriteOptions::default() + .try_with_compression(None) + .unwrap(); + + Self { + write_options, + data_gen: writer::IpcDataGenerator::default(), + dictionary_tracker: writer::DictionaryTracker::new(false), + } + } + pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData { match flight_message { FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(), diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index bd9ebe1762..fa3aaf5061 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -52,7 +52,7 @@ impl FlightRecordBatchStream { rx, join_handle, done: false, - encoder: FlightEncoder::default(), + encoder: FlightEncoder::with_compression_disabled(), } } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 3d16466f28..ddef4a79f3 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -139,7 +139,8 @@ mod test { let schema = record_batches[0].schema.arrow_schema().clone(); let stream = futures::stream::once(async move { - let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); + let mut schema_data = + FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema)); let metadata = DoPutMetadata::new(0); schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); // first message in "DoPut" stream should carry table name in flight descriptor @@ -154,7 +155,7 @@ mod test { tokio_stream::iter(record_batches) .enumerate() .map(|(i, x)| { - let mut encoder = FlightEncoder::default(); + let mut encoder = FlightEncoder::with_compression_disabled(); let message = FlightMessage::RecordBatch(x.into_df_record_batch()); let mut data = encoder.encode(message); let metadata = DoPutMetadata::new((i + 1) as i64);