feat: enable compression for metasrv client (#5078)

* feat: enable compression for metasrv client

* refactor: simplify gRPC service router registration

* chore: fix unit tests
This commit is contained in:
Weny Xu
2024-12-02 13:18:25 +08:00
committed by GitHub
parent c049ce6ab1
commit 0f116c8501
7 changed files with 50 additions and 15 deletions

View File

@@ -31,6 +31,7 @@ use common_meta::rpc::store::{
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;
@@ -173,7 +174,10 @@ impl Inner {
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
Ok(ClusterClient::new(channel))
Ok(ClusterClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -23,6 +23,7 @@ use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Streaming;
@@ -249,7 +250,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(HeartbeatClient::new(channel))
Ok(HeartbeatClient::new(channel)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -27,6 +27,7 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;
@@ -141,7 +142,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(ProcedureServiceClient::new(channel))
Ok(ProcedureServiceClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -25,6 +25,7 @@ use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::client::{load_balance as lb, Id};
@@ -236,7 +237,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(StoreClient::new(channel))
Ok(StoreClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -41,6 +41,7 @@ use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
@@ -178,14 +179,26 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}
#[macro_export]
macro_rules! add_compressed_service {
($builder:expr, $server:expr) => {
$builder.add_service(
$server
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
};
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
router.add_service(admin::make_admin_service(metasrv))
}
pub async fn metasrv_builder(

View File

@@ -38,6 +38,7 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
pub use crate::error::Result;
mod greptimedb_telemetry;

View File

@@ -24,8 +24,10 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tonic::codec::CompressionEncoding;
use tower::service_fn;
use crate::add_compressed_service;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
@@ -80,11 +82,14 @@ pub async fn mock(
let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
let mut router = tonic::transport::Server::builder();
let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
let router =
add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});