mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: make etcd store max codec size configurable (#6859)
* feat: make etcd store max codec size configable * feat: only decoding limit
This commit is contained in:
@@ -32,6 +32,8 @@ use crate::rpc::store::{
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
const DEFAULT_MAX_DECODING_SIZE: usize = 32 * 1024 * 1024; // 32MB
|
||||
|
||||
pub struct EtcdStore {
|
||||
client: Client,
|
||||
// Maximum number of operations permitted in a transaction.
|
||||
@@ -39,6 +41,8 @@ pub struct EtcdStore {
|
||||
//
|
||||
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
|
||||
max_txn_ops: usize,
|
||||
// Maximum decoding message size in bytes. Default 32MB.
|
||||
max_decoding_size: usize,
|
||||
}
|
||||
|
||||
impl EtcdStore {
|
||||
@@ -59,9 +63,20 @@ impl EtcdStore {
|
||||
Arc::new(Self {
|
||||
client,
|
||||
max_txn_ops,
|
||||
max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_max_decoding_size(&mut self, max_decoding_size: usize) {
|
||||
self.max_decoding_size = max_decoding_size;
|
||||
}
|
||||
|
||||
fn kv_client(&self) -> etcd_client::KvClient {
|
||||
self.client
|
||||
.kv_client()
|
||||
.max_decoding_message_size(self.max_decoding_size)
|
||||
}
|
||||
|
||||
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
|
||||
let max_txn_ops = self.max_txn_ops();
|
||||
if txn_ops.len() < max_txn_ops {
|
||||
@@ -71,7 +86,6 @@ impl EtcdStore {
|
||||
.start_timer();
|
||||
let txn = Txn::new().and_then(txn_ops);
|
||||
let txn_res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.txn(txn)
|
||||
.await
|
||||
@@ -110,7 +124,6 @@ impl KvBackend for EtcdStore {
|
||||
let Get { key, options } = req.try_into()?;
|
||||
|
||||
let mut res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.get(key, options)
|
||||
.await
|
||||
@@ -136,7 +149,6 @@ impl KvBackend for EtcdStore {
|
||||
} = req.try_into()?;
|
||||
|
||||
let mut res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.put(key, value, options)
|
||||
.await
|
||||
@@ -201,7 +213,6 @@ impl KvBackend for EtcdStore {
|
||||
let Delete { key, options } = req.try_into()?;
|
||||
|
||||
let mut res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.delete(key, options)
|
||||
.await
|
||||
@@ -265,7 +276,6 @@ impl TxnService for EtcdStore {
|
||||
|
||||
let etcd_txn: Txn = txn.into();
|
||||
let txn_res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.txn(etcd_txn)
|
||||
.await
|
||||
@@ -564,6 +574,7 @@ mod tests {
|
||||
Some(EtcdStore {
|
||||
client,
|
||||
max_txn_ops: 128,
|
||||
max_decoding_size: DEFAULT_MAX_DECODING_SIZE,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -237,10 +237,17 @@ impl Inner {
|
||||
.get(addr)
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
|
||||
let max_decoding_message_size = self
|
||||
.channel_manager
|
||||
.config()
|
||||
.max_recv_message_size
|
||||
.as_bytes() as usize;
|
||||
|
||||
Ok(StoreClient::new(channel)
|
||||
.accept_compressed(CompressionEncoding::Gzip)
|
||||
.accept_compressed(CompressionEncoding::Zstd)
|
||||
.send_compressed(CompressionEncoding::Zstd))
|
||||
.send_compressed(CompressionEncoding::Zstd)
|
||||
.max_decoding_message_size(max_decoding_message_size))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
Reference in New Issue
Block a user