diff --git a/Cargo.lock b/Cargo.lock index e6b8399b5e..b996f9d384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7561,6 +7561,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", @@ -7580,6 +7581,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2f4fcbc249..076d8d0b60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] } tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 057a1d4ad6..274f036f3d 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -83,6 +83,7 @@ impl Client { timeline_id: TimelineId, shard_id: ShardIndex, auth_header: Option, + compression: Option, ) -> anyhow::Result { let endpoint: tonic::transport::Endpoint = into_endpoint .try_into() @@ -90,7 +91,15 @@ impl Client { let channel = endpoint.connect().await?; let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id) .map_err(|e| anyhow::anyhow!(e.to_string()))?; - let client = proto::PageServiceClient::with_interceptor(channel, auth); + let mut client = proto::PageServiceClient::with_interceptor(channel, auth); + + if let Some(compression) = compression { + // TODO: benchmark this (including network latency). + // TODO: consider enabling compression by default. + client = client + .accept_compressed(compression) + .send_compressed(compression); + } Ok(Self { client }) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 57087dc6c3..800b47f235 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler { Ok(req) })) // Run the page service. - .service(proto::PageServiceServer::new(page_service_handler)); + .service( + proto::PageServiceServer::new(page_service_handler) + // Support both gzip and zstd compression. The client decides what to use. + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .send_compressed(tonic::codec::CompressionEncoding::Zstd), + ); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(resp.into())) } - // TODO: ensure clients use gzip compression for the stream. #[instrument(skip_all, fields(lsn))] async fn get_base_backup( &self,