From 1d53dd26ae1c4f91f80a114c67878dcf77c5811c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 11 Jun 2025 14:54:10 +0800 Subject: [PATCH] chore: add option for arrow flight compression mode (#6283) * chore/enable-flight-encoder: ### Add Flight Compression Support - **Configuration Updates**: - Added `grpc.flight_compression` option to `config/config.md`, `config/datanode.example.toml`, and `config/frontend.example.toml` to specify compression modes for Arrow IPC service. - **Code Enhancements**: - Updated `FlightEncoder` in `src/common/grpc/src/flight.rs` to support compression modes. - Modified `RegionServer` and `DatanodeBuilder` in `src/datanode/src/datanode.rs` and `src/datanode/src/region_server.rs` to handle `FlightCompression`. - Integrated `FlightCompression` in `src/servers/src/grpc.rs` and `src/servers/src/grpc/flight.rs` to manage compression settings. - **Testing and Integration**: - Updated test utilities and integration tests in `tests-integration/src/grpc/flight.rs` and `tests-integration/src/test_util.rs` to include `FlightCompression`. Signed-off-by: Lei, HUANG * chore/enable-flight-encoder: ### Enable Compression in FlightClient - **`client.rs`**: Updated `make_flight_client` to accept `send_compression` and `accept_compression` parameters, enabling Zstd compression for sending and receiving messages. - **`client_manager.rs`**: Modified `datanode` method to pass compression settings from `ChannelConfig` to `RegionRequester`. - **`database.rs`**: Adjusted calls to `make_flight_client` to include compression parameters. - **`region.rs`**: Updated `RegionRequester` to store and utilize compression settings. - **`frontend.rs`**: Configured `ChannelConfig` to enable compression based on options. - **`channel_manager.rs`**: Added `send_compression` and `accept_compression` fields to `ChannelConfig` with default values and updated tests accordingly. Signed-off-by: Lei, HUANG * chore/enable-flight-encoder: ### Update Compression Defaults and Documentation - **Configuration Files**: Updated `datanode.example.toml` and `frontend.example.toml` to include a default setting comment for `flight_compression`, specifying it defaults to `none`. - **gRPC Server Code**: Modified `grpc.rs` to set `None` as the default for `FlightCompression` instead of `ArrowIpc`. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG Signed-off-by: Lei, HUANG --- config/config.md | 2 ++ config/datanode.example.toml | 7 ++++++ config/frontend.example.toml | 7 ++++++ src/client/src/client.rs | 15 +++++++++++-- src/client/src/client_manager.rs | 11 +++++++++- src/client/src/database.rs | 4 ++-- src/client/src/region.rs | 14 +++++++++--- src/cmd/src/frontend.rs | 6 +++++- src/common/grpc/src/channel_manager.rs | 8 +++++++ src/common/grpc/src/flight.rs | 1 + src/datanode/src/datanode.rs | 1 + src/datanode/src/region_server.rs | 12 ++++++++++- src/datanode/src/tests.rs | 2 ++ src/frontend/src/server.rs | 1 + src/servers/src/grpc.rs | 27 ++++++++++++++++++++++++ src/servers/src/grpc/flight.rs | 15 +++++++++---- src/servers/src/grpc/flight/stream.rs | 20 +++++++++++++++--- src/servers/src/grpc/greptime_handler.rs | 5 ++++- tests-integration/src/grpc/flight.rs | 8 +++---- tests-integration/src/test_util.rs | 3 ++- tests-integration/tests/http.rs | 1 + 21 files changed, 147 insertions(+), 23 deletions(-) diff --git a/config/config.md b/config/config.md index 1bb645229f..608c76fe44 100644 --- a/config/config.md +++ b/config/config.md @@ -232,6 +232,7 @@ | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `grpc.bind_addr`. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | +| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:
- `none`: disable all compression
- `transport`: only enable gRPC transport compression (zstd)
- `arrow_ipc`: only enable Arrow IPC compression (lz4)
- `all`: enable all compression. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. | @@ -404,6 +405,7 @@ | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | +| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:
- `none`: disable all compression
- `transport`: only enable gRPC transport compression (zstd)
- `arrow_ipc`: only enable Arrow IPC compression (lz4)
- `all`: enable all compression. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0f86c6b3f5..507858383a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -44,6 +44,13 @@ runtime_size = 8 max_recv_message_size = "512MB" ## The maximum send message size for gRPC server. max_send_message_size = "512MB" +## Compression mode for datanode side Arrow IPC service. Available options: +## - `none`: disable all compression +## - `transport`: only enable gRPC transport compression (zstd) +## - `arrow_ipc`: only enable Arrow IPC compression (lz4) +## - `all`: enable all compression. +## Default to `none` +flight_compression = "arrow_ipc" ## gRPC server TLS options, see `mysql.tls` section. [grpc.tls] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 88bac9cdcd..a6a818bcc0 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -54,6 +54,13 @@ bind_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001" ## The number of server worker threads. runtime_size = 8 +## Compression mode for frontend side Arrow IPC service. Available options: +## - `none`: disable all compression +## - `transport`: only enable gRPC transport compression (zstd) +## - `arrow_ipc`: only enable Arrow IPC compression (lz4) +## - `all`: enable all compression. +## Default to `none` +flight_compression = "arrow_ipc" ## gRPC server TLS options, see `mysql.tls` section. [grpc.tls] diff --git a/src/client/src/client.rs b/src/client/src/client.rs index a604bf5124..81c0810ba2 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -162,12 +162,23 @@ impl Client { .as_bytes() as usize } - pub fn make_flight_client(&self) -> Result { + pub fn make_flight_client( + &self, + send_compression: bool, + accept_compression: bool, + ) -> Result { let (addr, channel) = self.find_channel()?; - let client = FlightServiceClient::new(channel) + let mut client = FlightServiceClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) .max_encoding_message_size(self.max_grpc_send_message_size()); + // todo(hl): support compression methods. + if send_compression { + client = client.send_compressed(CompressionEncoding::Zstd); + } + if accept_compression { + client = client.accept_compressed(CompressionEncoding::Zstd); + } Ok(FlightClient { addr, client }) } diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index adc284eaf1..f5c37f5181 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -49,7 +49,16 @@ impl NodeManager for NodeClients { async fn datanode(&self, datanode: &Peer) -> DatanodeRef { let client = self.get_client(datanode).await; - Arc::new(RegionRequester::new(client)) + let ChannelConfig { + send_compression, + accept_compression, + .. + } = self.channel_manager.config(); + Arc::new(RegionRequester::new( + client, + *send_compression, + *accept_compression, + )) } async fn flownode(&self, flownode: &Peer) -> FlownodeRef { diff --git a/src/client/src/database.rs b/src/client/src/database.rs index bd54f89809..ad6523cb67 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -287,7 +287,7 @@ impl Database { let mut request = tonic::Request::new(request); Self::put_hints(request.metadata_mut(), hints)?; - let mut client = self.client.make_flight_client()?; + let mut client = self.client.make_flight_client(false, false)?; let response = client.mut_inner().do_get(request).await.or_else(|e| { let tonic_code = e.code(); @@ -409,7 +409,7 @@ impl Database { MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?, ); - let mut client = self.client.make_flight_client()?; + let mut client = self.client.make_flight_client(false, false)?; let response = client.mut_inner().do_put(request).await?; let response = response .into_inner() diff --git a/src/client/src/region.rs b/src/client/src/region.rs index e2016deab5..eca4d504b0 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -46,6 +46,8 @@ use crate::{metrics, Client, Error}; #[derive(Debug)] pub struct RegionRequester { client: Client, + send_compression: bool, + accept_compression: bool, } #[async_trait] @@ -89,12 +91,18 @@ impl Datanode for RegionRequester { } impl RegionRequester { - pub fn new(client: Client) -> Self { - Self { client } + pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self { + Self { + client, + send_compression, + accept_compression, + } } pub async fn do_get_inner(&self, ticket: Ticket) -> Result { - let mut flight_client = self.client.make_flight_client()?; + let mut flight_client = self + .client + .make_flight_client(self.send_compression, self.accept_compression)?; let response = flight_client .mut_inner() .do_get(ticket) diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 40154d36a4..f132283e45 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -364,12 +364,16 @@ impl StartCommand { // frontend to datanode need not timeout. // Some queries are expected to take long time. - let channel_config = ChannelConfig { + let mut channel_config = ChannelConfig { timeout: None, tcp_nodelay: opts.datanode.client.tcp_nodelay, connect_timeout: Some(opts.datanode.client.connect_timeout), ..Default::default() }; + if opts.grpc.flight_compression.transport_compression() { + channel_config.accept_compression = true; + channel_config.send_compression = true; + } let client = NodeClients::new(channel_config); let instance = FrontendBuilder::new( diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 713ad58d81..a33a34ab35 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -296,6 +296,8 @@ pub struct ChannelConfig { pub max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size pub max_send_message_size: ReadableSize, + pub send_compression: bool, + pub accept_compression: bool, } impl Default for ChannelConfig { @@ -316,6 +318,8 @@ impl Default for ChannelConfig { client_tls: None, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + send_compression: false, + accept_compression: false, } } } @@ -566,6 +570,8 @@ mod tests { client_tls: None, max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + send_compression: false, + accept_compression: false, }, default_cfg ); @@ -610,6 +616,8 @@ mod tests { }), max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + send_compression: false, + accept_compression: false, }, cfg ); diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 0b83db9cfd..36a9a93dec 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -64,6 +64,7 @@ impl Default for FlightEncoder { } impl FlightEncoder { + /// Creates new [FlightEncoder] with compression disabled. pub fn with_compression_disabled() -> Self { let write_options = writer::IpcWriteOptions::default() .try_with_compression(None) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 47f34c8f9b..5c87641947 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -372,6 +372,7 @@ impl DatanodeBuilder { opts.max_concurrent_queries, //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter. Duration::from_millis(100), + opts.grpc.flight_compression, ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 9bbb899bf1..2f3530721a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -50,6 +50,7 @@ use query::QueryEngineRef; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; +use servers::grpc::FlightCompression; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ @@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef; #[derive(Clone)] pub struct RegionServer { inner: Arc, + flight_compression: FlightCompression, } pub struct RegionStat { @@ -93,6 +95,7 @@ impl RegionServer { query_engine: QueryEngineRef, runtime: Runtime, event_listener: RegionServerEventListenerRef, + flight_compression: FlightCompression, ) -> Self { Self::with_table_provider( query_engine, @@ -101,6 +104,7 @@ impl RegionServer { Arc::new(DummyTableProviderFactory), 0, Duration::from_millis(0), + flight_compression, ) } @@ -111,6 +115,7 @@ impl RegionServer { table_provider_factory: TableProviderFactoryRef, max_concurrent_queries: usize, concurrent_query_limiter_timeout: Duration, + flight_compression: FlightCompression, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( @@ -123,6 +128,7 @@ impl RegionServer { concurrent_query_limiter_timeout, ), )), + flight_compression, } } @@ -536,7 +542,11 @@ impl FlightCraft for RegionServer { .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; - let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context)); + let stream = Box::pin(FlightRecordBatchStream::new( + result, + tracing_context, + self.flight_compression, + )); Ok(Response::new(stream)) } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index ee6b611b46..4c0b95c2ef 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -28,6 +28,7 @@ use query::dataframe::DataFrame; use query::planner::LogicalPlanner; use query::query_engine::{DescribeResult, QueryEngineState}; use query::{QueryEngine, QueryEngineContext}; +use servers::grpc::FlightCompression; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ @@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer { Arc::new(MockQueryEngine), Runtime::builder().build().unwrap(), Box::new(NoopRegionServerEventListener), + FlightCompression::default(), ) } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index abd80c940b..04d3dd86ce 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -154,6 +154,7 @@ where ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), user_provider.clone(), runtime, + opts.grpc.flight_compression, ); let grpc_server = builder diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index bad5dd9ae7..4712a5a744 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -66,6 +66,8 @@ pub struct GrpcOptions { pub max_recv_message_size: ReadableSize, /// Max gRPC sending(encoding) message size pub max_send_message_size: ReadableSize, + /// Compression mode in Arrow Flight service. + pub flight_compression: FlightCompression, pub runtime_size: usize, #[serde(default = "Default::default")] pub tls: TlsOption, @@ -114,6 +116,7 @@ impl Default for GrpcOptions { server_addr: String::new(), max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + flight_compression: FlightCompression::ArrowIpc, runtime_size: 8, tls: TlsOption::default(), } @@ -132,6 +135,30 @@ impl GrpcOptions { } } +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum FlightCompression { + /// Disable all compression in Arrow Flight service. + #[default] + None, + /// Enable only transport layer compression (zstd). + Transport, + /// Enable only payload compression (lz4) + ArrowIpc, + /// Enable all compression. + All, +} + +impl FlightCompression { + pub fn transport_compression(&self) -> bool { + self == &FlightCompression::Transport || self == &FlightCompression::All + } + + pub fn arrow_compression(&self) -> bool { + self == &FlightCompression::ArrowIpc || self == &FlightCompression::All + } +} + pub struct GrpcServer { // states shutdown_tx: Mutex>>, diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 6f12c5b654..37c2cf05f2 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming}; use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu}; pub use crate::grpc::flight::stream::FlightRecordBatchStream; use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler}; -use crate::grpc::TonicResult; +use crate::grpc::{FlightCompression, TonicResult}; use crate::http::header::constants::GREPTIME_DB_HEADER_NAME; use crate::http::AUTHORIZATION_HEADER; use crate::{error, hint_headers}; @@ -195,9 +195,14 @@ impl FlightCraft for GreptimeRequestHandler { protocol = "grpc", request_type = get_request_type(&request) ); + let flight_compression = self.flight_compression; async { let output = self.handle_request(request, hints).await?; - let stream = to_flight_data_stream(output, TracingContext::from_current_span()); + let stream = to_flight_data_stream( + output, + TracingContext::from_current_span(), + flight_compression, + ); Ok(Response::new(stream)) } .trace(span) @@ -365,14 +370,16 @@ impl Stream for PutRecordBatchRequestStream { fn to_flight_data_stream( output: Output, tracing_context: TracingContext, + flight_compression: FlightCompression, ) -> TonicStream { match output.data { OutputData::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream, tracing_context); + let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression); Box::pin(stream) as _ } OutputData::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context); + let stream = + FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression); Box::pin(stream) as _ } OutputData::AffectedRows(rows) => { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index fa3aaf5061..aa9740e395 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -30,6 +30,7 @@ use tokio::task::JoinHandle; use crate::error; use crate::grpc::flight::TonicResult; +use crate::grpc::FlightCompression; #[pin_project(PinnedDrop)] pub struct FlightRecordBatchStream { @@ -41,18 +42,27 @@ pub struct FlightRecordBatchStream { } impl FlightRecordBatchStream { - pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self { + pub fn new( + recordbatches: SendableRecordBatchStream, + tracing_context: TracingContext, + compression: FlightCompression, + ) -> Self { let (tx, rx) = mpsc::channel::>(1); let join_handle = common_runtime::spawn_global(async move { Self::flight_data_stream(recordbatches, tx) .trace(tracing_context.attach(info_span!("flight_data_stream"))) .await }); + let encoder = if compression.arrow_compression() { + FlightEncoder::default() + } else { + FlightEncoder::with_compression_disabled() + }; Self { rx, join_handle, done: false, - encoder: FlightEncoder::with_compression_disabled(), + encoder, } } @@ -161,7 +171,11 @@ mod test { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]) .unwrap() .as_stream(); - let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default()); + let mut stream = FlightRecordBatchStream::new( + recordbatches, + TracingContext::default(), + FlightCompression::default(), + ); let mut raw_data = Vec::with_capacity(2); raw_data.push(stream.next().await.unwrap().unwrap()); diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 6924ee90a8..14bf55bf6b 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -49,7 +49,7 @@ use crate::error::{ JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu, }; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; -use crate::grpc::TonicResult; +use crate::grpc::{FlightCompression, TonicResult}; use crate::metrics; use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; @@ -59,6 +59,7 @@ pub struct GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, runtime: Option, + pub(crate) flight_compression: FlightCompression, } impl GreptimeRequestHandler { @@ -66,11 +67,13 @@ impl GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, runtime: Option, + flight_compression: FlightCompression, ) -> Self { Self { handler, user_provider, runtime, + flight_compression, } } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index ddef4a79f3..85953dd1ee 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -34,7 +34,7 @@ mod test { use itertools::Itertools; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; - use servers::grpc::GrpcServerConfig; + use servers::grpc::{FlightCompression, GrpcServerConfig}; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; use servers::server::Server; @@ -94,6 +94,7 @@ mod test { ) .ok(), Some(runtime.clone()), + FlightCompression::default(), ); let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime) .flight_handler(Arc::new(greptime_request_handler)) @@ -139,8 +140,7 @@ mod test { let schema = record_batches[0].schema.arrow_schema().clone(); let stream = futures::stream::once(async move { - let mut schema_data = - FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema)); + let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); let metadata = DoPutMetadata::new(0); schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); // first message in "DoPut" stream should carry table name in flight descriptor @@ -155,7 +155,7 @@ mod test { tokio_stream::iter(record_batches) .enumerate() .map(|(i, x)| { - let mut encoder = FlightEncoder::with_compression_disabled(); + let mut encoder = FlightEncoder::default(); let message = FlightMessage::RecordBatch(x.into_df_record_batch()); let mut data = encoder.encode(message); let metadata = DoPutMetadata::new((i + 1) as i64); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 81672b6fa6..9bec5eb624 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -42,7 +42,7 @@ use object_store::test_util::TempFolder; use object_store::ObjectStore; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; -use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; +use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig}; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -585,6 +585,7 @@ pub async fn setup_grpc_server_with( ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), user_provider.clone(), Some(runtime.clone()), + FlightCompression::default(), ); let flight_handler = Arc::new(greptime_request_handler.clone()); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 616c54145c..0dfbcc7dcb 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1025,6 +1025,7 @@ bind_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001" max_recv_message_size = "512MiB" max_send_message_size = "512MiB" +flight_compression = "arrow_ipc" runtime_size = 8 [grpc.tls]