diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index f326984cca..0b3b45b9bc 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -32,6 +32,8 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; // 32MB + pub struct EtcdStore { client: Client, // Maximum number of operations permitted in a transaction. @@ -39,6 +41,8 @@ pub struct EtcdStore { // // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ max_txn_ops: usize, + // Maximum decoding message size in bytes. Default 32MB. + max_decoding_size: usize, } impl EtcdStore { @@ -59,9 +63,20 @@ impl EtcdStore { Arc::new(Self { client, max_txn_ops, + max_decoding_size: DEFAULT_MAX_DECODING_SIZE, }) } + pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) { + self.max_decoding_size = max_decoding_size; + } + + fn kv_client(&self) -> etcd_client::KvClient { + self.client + .kv_client() + .max_decoding_message_size(self.max_decoding_size) + } + async fn do_multi_txn(&self, txn_ops: Vec) -> Result> { let max_txn_ops = self.max_txn_ops(); if txn_ops.len() < max_txn_ops { @@ -71,7 +86,6 @@ impl EtcdStore { .start_timer(); let txn = Txn::new().and_then(txn_ops); let txn_res = self - .client .kv_client() .txn(txn) .await @@ -110,7 +124,6 @@ impl KvBackend for EtcdStore { let Get { key, options } = req.try_into()?; let mut res = self - .client .kv_client() .get(key, options) .await @@ -136,7 +149,6 @@ impl KvBackend for EtcdStore { } = req.try_into()?; let mut res = self - .client .kv_client() .put(key, value, options) .await @@ -201,7 +213,6 @@ impl KvBackend for EtcdStore { let Delete { key, options } = req.try_into()?; let mut res = self - .client .kv_client() .delete(key, options) .await @@ -265,7 +276,6 @@ impl TxnService for EtcdStore { let etcd_txn: Txn = txn.into(); let txn_res = self - .client .kv_client() .txn(etcd_txn) .await @@ -564,6 +574,7 @@ mod tests { Some(EtcdStore { client, max_txn_ops: 128, + max_decoding_size: DEFAULT_MAX_DECODING_SIZE, }) } diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index 3026920555..ed6b1fc79e 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -237,10 +237,17 @@ impl Inner { .get(addr) .context(error::CreateChannelSnafu)?; + let max_decoding_message_size = self + .channel_manager + .config() + .max_recv_message_size + .as_bytes() as usize; + Ok(StoreClient::new(channel) .accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) - .send_compressed(CompressionEncoding::Zstd)) + .send_compressed(CompressionEncoding::Zstd) + .max_decoding_message_size(max_decoding_message_size)) } #[inline]