diff --git a/config/config.md b/config/config.md index 1bb645229f..608c76fe44 100644 --- a/config/config.md +++ b/config/config.md @@ -232,6 +232,7 @@ | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `grpc.bind_addr`. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | +| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:
- `none`: disable all compression
- `transport`: only enable gRPC transport compression (zstd)
- `arrow_ipc`: only enable Arrow IPC compression (lz4)
- `all`: enable all compression. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. | @@ -404,6 +405,7 @@ | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | +| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:
- `none`: disable all compression
- `transport`: only enable gRPC transport compression (zstd)
- `arrow_ipc`: only enable Arrow IPC compression (lz4)
- `all`: enable all compression. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls.mode` | String | `disable` | TLS mode. | | `grpc.tls.cert_path` | String | Unset | Certificate file path. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0f86c6b3f5..e93529444f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -44,6 +44,12 @@ runtime_size = 8 max_recv_message_size = "512MB" ## The maximum send message size for gRPC server. max_send_message_size = "512MB" +## Compression mode for datanode side Arrow IPC service. Available options: +## - `none`: disable all compression +## - `transport`: only enable gRPC transport compression (zstd) +## - `arrow_ipc`: only enable Arrow IPC compression (lz4) +## - `all`: enable all compression. +flight_compression = "arrow_ipc" ## gRPC server TLS options, see `mysql.tls` section. [grpc.tls] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 88bac9cdcd..073e4206a1 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -54,6 +54,12 @@ bind_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001" ## The number of server worker threads. runtime_size = 8 +## Compression mode for frontend side Arrow IPC service. Available options: +## - `none`: disable all compression +## - `transport`: only enable gRPC transport compression (zstd) +## - `arrow_ipc`: only enable Arrow IPC compression (lz4) +## - `all`: enable all compression. +flight_compression = "arrow_ipc" ## gRPC server TLS options, see `mysql.tls` section. [grpc.tls] diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 0b83db9cfd..36a9a93dec 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -64,6 +64,7 @@ impl Default for FlightEncoder { } impl FlightEncoder { + /// Creates new [FlightEncoder] with compression disabled. pub fn with_compression_disabled() -> Self { let write_options = writer::IpcWriteOptions::default() .try_with_compression(None) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 47f34c8f9b..5c87641947 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -372,6 +372,7 @@ impl DatanodeBuilder { opts.max_concurrent_queries, //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter. Duration::from_millis(100), + opts.grpc.flight_compression, ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 9bbb899bf1..2f3530721a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -50,6 +50,7 @@ use query::QueryEngineRef; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; +use servers::grpc::FlightCompression; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ @@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef; #[derive(Clone)] pub struct RegionServer { inner: Arc, + flight_compression: FlightCompression, } pub struct RegionStat { @@ -93,6 +95,7 @@ impl RegionServer { query_engine: QueryEngineRef, runtime: Runtime, event_listener: RegionServerEventListenerRef, + flight_compression: FlightCompression, ) -> Self { Self::with_table_provider( query_engine, @@ -101,6 +104,7 @@ impl RegionServer { Arc::new(DummyTableProviderFactory), 0, Duration::from_millis(0), + flight_compression, ) } @@ -111,6 +115,7 @@ impl RegionServer { table_provider_factory: TableProviderFactoryRef, max_concurrent_queries: usize, concurrent_query_limiter_timeout: Duration, + flight_compression: FlightCompression, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( @@ -123,6 +128,7 @@ impl RegionServer { concurrent_query_limiter_timeout, ), )), + flight_compression, } } @@ -536,7 +542,11 @@ impl FlightCraft for RegionServer { .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) .await?; - let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context)); + let stream = Box::pin(FlightRecordBatchStream::new( + result, + tracing_context, + self.flight_compression, + )); Ok(Response::new(stream)) } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index ee6b611b46..4c0b95c2ef 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -28,6 +28,7 @@ use query::dataframe::DataFrame; use query::planner::LogicalPlanner; use query::query_engine::{DescribeResult, QueryEngineState}; use query::{QueryEngine, QueryEngineContext}; +use servers::grpc::FlightCompression; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ @@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer { Arc::new(MockQueryEngine), Runtime::builder().build().unwrap(), Box::new(NoopRegionServerEventListener), + FlightCompression::default(), ) } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index abd80c940b..04d3dd86ce 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -154,6 +154,7 @@ where ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), user_provider.clone(), runtime, + opts.grpc.flight_compression, ); let grpc_server = builder diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index bad5dd9ae7..82ed1653ec 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -66,6 +66,8 @@ pub struct GrpcOptions { pub max_recv_message_size: ReadableSize, /// Max gRPC sending(encoding) message size pub max_send_message_size: ReadableSize, + /// Compression mode in Arrow Flight service. + pub flight_compression: FlightCompression, pub runtime_size: usize, #[serde(default = "Default::default")] pub tls: TlsOption, @@ -114,6 +116,7 @@ impl Default for GrpcOptions { server_addr: String::new(), max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + flight_compression: FlightCompression::ArrowIpc, runtime_size: 8, tls: TlsOption::default(), } @@ -132,6 +135,30 @@ impl GrpcOptions { } } +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum FlightCompression { + /// Disable all compression in Arrow Flight service. + None, + /// Enable only transport layer compression (zstd). + Transport, + /// Enable only payload compression (lz4) + #[default] + ArrowIpc, + /// Enable all compression. + All, +} + +impl FlightCompression { + pub fn transport_compression(&self) -> bool { + self == &FlightCompression::Transport || self == &FlightCompression::All + } + + pub fn arrow_compression(&self) -> bool { + self == &FlightCompression::ArrowIpc || self == &FlightCompression::All + } +} + pub struct GrpcServer { // states shutdown_tx: Mutex>>, diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 6f12c5b654..37c2cf05f2 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming}; use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu}; pub use crate::grpc::flight::stream::FlightRecordBatchStream; use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler}; -use crate::grpc::TonicResult; +use crate::grpc::{FlightCompression, TonicResult}; use crate::http::header::constants::GREPTIME_DB_HEADER_NAME; use crate::http::AUTHORIZATION_HEADER; use crate::{error, hint_headers}; @@ -195,9 +195,14 @@ impl FlightCraft for GreptimeRequestHandler { protocol = "grpc", request_type = get_request_type(&request) ); + let flight_compression = self.flight_compression; async { let output = self.handle_request(request, hints).await?; - let stream = to_flight_data_stream(output, TracingContext::from_current_span()); + let stream = to_flight_data_stream( + output, + TracingContext::from_current_span(), + flight_compression, + ); Ok(Response::new(stream)) } .trace(span) @@ -365,14 +370,16 @@ impl Stream for PutRecordBatchRequestStream { fn to_flight_data_stream( output: Output, tracing_context: TracingContext, + flight_compression: FlightCompression, ) -> TonicStream { match output.data { OutputData::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream, tracing_context); + let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression); Box::pin(stream) as _ } OutputData::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context); + let stream = + FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression); Box::pin(stream) as _ } OutputData::AffectedRows(rows) => { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index fa3aaf5061..aa9740e395 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -30,6 +30,7 @@ use tokio::task::JoinHandle; use crate::error; use crate::grpc::flight::TonicResult; +use crate::grpc::FlightCompression; #[pin_project(PinnedDrop)] pub struct FlightRecordBatchStream { @@ -41,18 +42,27 @@ pub struct FlightRecordBatchStream { } impl FlightRecordBatchStream { - pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self { + pub fn new( + recordbatches: SendableRecordBatchStream, + tracing_context: TracingContext, + compression: FlightCompression, + ) -> Self { let (tx, rx) = mpsc::channel::>(1); let join_handle = common_runtime::spawn_global(async move { Self::flight_data_stream(recordbatches, tx) .trace(tracing_context.attach(info_span!("flight_data_stream"))) .await }); + let encoder = if compression.arrow_compression() { + FlightEncoder::default() + } else { + FlightEncoder::with_compression_disabled() + }; Self { rx, join_handle, done: false, - encoder: FlightEncoder::with_compression_disabled(), + encoder, } } @@ -161,7 +171,11 @@ mod test { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]) .unwrap() .as_stream(); - let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default()); + let mut stream = FlightRecordBatchStream::new( + recordbatches, + TracingContext::default(), + FlightCompression::default(), + ); let mut raw_data = Vec::with_capacity(2); raw_data.push(stream.next().await.unwrap().unwrap()); diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 6924ee90a8..14bf55bf6b 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -49,7 +49,7 @@ use crate::error::{ JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu, }; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; -use crate::grpc::TonicResult; +use crate::grpc::{FlightCompression, TonicResult}; use crate::metrics; use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; @@ -59,6 +59,7 @@ pub struct GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, runtime: Option, + pub(crate) flight_compression: FlightCompression, } impl GreptimeRequestHandler { @@ -66,11 +67,13 @@ impl GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, runtime: Option, + flight_compression: FlightCompression, ) -> Self { Self { handler, user_provider, runtime, + flight_compression, } } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index ddef4a79f3..85953dd1ee 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -34,7 +34,7 @@ mod test { use itertools::Itertools; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; - use servers::grpc::GrpcServerConfig; + use servers::grpc::{FlightCompression, GrpcServerConfig}; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; use servers::server::Server; @@ -94,6 +94,7 @@ mod test { ) .ok(), Some(runtime.clone()), + FlightCompression::default(), ); let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime) .flight_handler(Arc::new(greptime_request_handler)) @@ -139,8 +140,7 @@ mod test { let schema = record_batches[0].schema.arrow_schema().clone(); let stream = futures::stream::once(async move { - let mut schema_data = - FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema)); + let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); let metadata = DoPutMetadata::new(0); schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); // first message in "DoPut" stream should carry table name in flight descriptor @@ -155,7 +155,7 @@ mod test { tokio_stream::iter(record_batches) .enumerate() .map(|(i, x)| { - let mut encoder = FlightEncoder::with_compression_disabled(); + let mut encoder = FlightEncoder::default(); let message = FlightMessage::RecordBatch(x.into_df_record_batch()); let mut data = encoder.encode(message); let metadata = DoPutMetadata::new((i + 1) as i64); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 81672b6fa6..9bec5eb624 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -42,7 +42,7 @@ use object_store::test_util::TempFolder; use object_store::ObjectStore; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; -use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; +use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig}; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -585,6 +585,7 @@ pub async fn setup_grpc_server_with( ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), user_provider.clone(), Some(runtime.clone()), + FlightCompression::default(), ); let flight_handler = Arc::new(greptime_request_handler.clone()); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 616c54145c..0dfbcc7dcb 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1025,6 +1025,7 @@ bind_addr = "127.0.0.1:4001" server_addr = "127.0.0.1:4001" max_recv_message_size = "512MiB" max_send_message_size = "512MiB" +flight_compression = "arrow_ipc" runtime_size = 8 [grpc.tls]