diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index a893be1311..78a612f4a2 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -755,16 +755,21 @@ mod tests { async fn test_batch_put() { let tc = new_client("test_batch_put").await; - let req = BatchPutRequest::new() - .add_kv(tc.key("key"), b"value".to_vec()) - .add_kv(tc.key("key2"), b"value2".to_vec()); + let mut req = BatchPutRequest::new(); + for i in 0..256 { + req = req.add_kv( + tc.key(&format!("key-{}", i)), + format!("value-{}", i).into_bytes(), + ); + } + let res = tc.client.batch_put(req).await; assert_eq!(0, res.unwrap().take_prev_kvs().len()); - let req = RangeRequest::new().with_range(tc.key("key"), tc.key("key3")); + let req = RangeRequest::new().with_prefix(tc.key("key-")); let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); - assert_eq!(2, kvs.len()); + assert_eq!(256, kvs.len()); } #[tokio::test] @@ -772,16 +777,17 @@ mod tests { let tc = new_client("test_batch_get").await; tc.gen_data().await; - let req = BatchGetRequest::default() - .add_key(tc.key("key-1")) - .add_key(tc.key("key-2")); + let mut req = BatchGetRequest::default(); + for i in 0..256 { + req = req.add_key(tc.key(&format!("key-{}", i))); + } let mut res = tc.client.batch_get(req).await.unwrap(); - assert_eq!(2, res.take_kvs().len()); + assert_eq!(10, res.take_kvs().len()); let req = BatchGetRequest::default() .add_key(tc.key("key-1")) - .add_key(tc.key("key-222")); + .add_key(tc.key("key-999")); let mut res = tc.client.batch_get(req).await.unwrap(); assert_eq!(1, res.take_kvs().len()); diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 04f82c4672..22834b355b 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -24,6 +24,7 @@ use common_error::prelude::*; use common_telemetry::{timer, warn}; use etcd_client::{ Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, + TxnResponse, }; use crate::error; @@ -31,6 +32,8 @@ use crate::error::Result; use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::store::kv::{KvStore, KvStoreRef}; +const MAX_TXN_SIZE: usize = 128; + pub struct EtcdStore { client: Client, } @@ -51,6 +54,51 @@ impl EtcdStore { pub fn with_etcd_client(client: Client) -> Result { Ok(Arc::new(Self { client })) } + + async fn do_multi_txn(&self, mut txn_ops: Vec) -> Result> { + if txn_ops.len() < MAX_TXN_SIZE { + // fast path + let txn = Txn::new().and_then(txn_ops); + let txn_res = self + .client + .kv_client() + .txn(txn) + .await + .context(error::EtcdFailedSnafu)?; + return Ok(vec![txn_res]); + } + + let mut txns = vec![]; + loop { + if txn_ops.is_empty() { + break; + } + + if txn_ops.len() < MAX_TXN_SIZE { + let txn = Txn::new().and_then(txn_ops); + txns.push(txn); + break; + } + + let part = txn_ops.drain(..MAX_TXN_SIZE).collect::>(); + let txn = Txn::new().and_then(part); + txns.push(txn); + } + + let mut txn_responses = Vec::with_capacity(txns.len()); + // Considering the pressure on etcd, it would be more appropriate to execute txn in + // a serial manner. + for txn in txns { + let txn_res = self + .client + .kv_client() + .txn(txn) + .await + .context(error::EtcdFailedSnafu)?; + txn_responses.push(txn_res); + } + Ok(txn_responses) + } } #[async_trait::async_trait] @@ -142,23 +190,19 @@ impl KvStore for EtcdStore { .into_iter() .map(|k| TxnOp::get(k, options.clone())) .collect(); - let txn = Txn::new().and_then(get_ops); - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; + let txn_responses = self.do_multi_txn(get_ops).await?; let mut kvs = vec![]; - for op_res in txn_res.op_responses() { - let get_res = match op_res { - TxnOpResponse::Get(get_res) => get_res, - _ => unreachable!(), - }; + for txn_res in txn_responses { + for op_res in txn_res.op_responses() { + let get_res = match op_res { + TxnOpResponse::Get(get_res) => get_res, + _ => unreachable!(), + }; - kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); + kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); + } } let header = Some(ResponseHeader::success(cluster_id)); @@ -185,24 +229,20 @@ impl KvStore for EtcdStore { .into_iter() .map(|kv| (TxnOp::put(kv.key, kv.value, options.clone()))) .collect::>(); - let txn = Txn::new().and_then(put_ops); - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; + let txn_responses = self.do_multi_txn(put_ops).await?; let mut prev_kvs = vec![]; - for op_res in txn_res.op_responses() { - match op_res { - TxnOpResponse::Put(put_res) => { - if let Some(prev_kv) = put_res.prev_key() { - prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); + for txn_res in txn_responses { + for op_res in txn_res.op_responses() { + match op_res { + TxnOpResponse::Put(put_res) => { + if let Some(prev_kv) = put_res.prev_key() { + prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); + } } + _ => unreachable!(), // never get here } - _ => unreachable!(), // never get here } } @@ -232,28 +272,23 @@ impl KvStore for EtcdStore { .into_iter() .map(|k| TxnOp::delete(k, options.clone())) .collect::>(); - let txn = Txn::new().and_then(delete_ops); - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; + let txn_responses = self.do_multi_txn(delete_ops).await?; - for op_res in txn_res.op_responses() { - match op_res { - TxnOpResponse::Delete(delete_res) => { - delete_res.prev_kvs().iter().for_each(|kv| { - prev_kvs.push(KvPair::from_etcd_kv(kv)); - }); + for txn_res in txn_responses { + for op_res in txn_res.op_responses() { + match op_res { + TxnOpResponse::Delete(delete_res) => { + delete_res.prev_kvs().iter().for_each(|kv| { + prev_kvs.push(KvPair::from_etcd_kv(kv)); + }); + } + _ => unreachable!(), // never get here } - _ => unreachable!(), // never get here } } let header = Some(ResponseHeader::success(cluster_id)); - Ok(BatchDeleteResponse { header, prev_kvs }) }