feat: deal with more than 128 txn (#1799)

This commit is contained in:
JeremyHi
2023-06-20 17:56:45 +08:00
committed by GitHub
parent cbc2620a59
commit 323e2aed07
2 changed files with 92 additions and 51 deletions

View File

@@ -755,16 +755,21 @@ mod tests {
async fn test_batch_put() {
let tc = new_client("test_batch_put").await;
let req = BatchPutRequest::new()
.add_kv(tc.key("key"), b"value".to_vec())
.add_kv(tc.key("key2"), b"value2".to_vec());
let mut req = BatchPutRequest::new();
for i in 0..256 {
req = req.add_kv(
tc.key(&format!("key-{}", i)),
format!("value-{}", i).into_bytes(),
);
}
let res = tc.client.batch_put(req).await;
assert_eq!(0, res.unwrap().take_prev_kvs().len());
let req = RangeRequest::new().with_range(tc.key("key"), tc.key("key3"));
let req = RangeRequest::new().with_prefix(tc.key("key-"));
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(2, kvs.len());
assert_eq!(256, kvs.len());
}
#[tokio::test]
@@ -772,16 +777,17 @@ mod tests {
let tc = new_client("test_batch_get").await;
tc.gen_data().await;
let req = BatchGetRequest::default()
.add_key(tc.key("key-1"))
.add_key(tc.key("key-2"));
let mut req = BatchGetRequest::default();
for i in 0..256 {
req = req.add_key(tc.key(&format!("key-{}", i)));
}
let mut res = tc.client.batch_get(req).await.unwrap();
assert_eq!(2, res.take_kvs().len());
assert_eq!(10, res.take_kvs().len());
let req = BatchGetRequest::default()
.add_key(tc.key("key-1"))
.add_key(tc.key("key-222"));
.add_key(tc.key("key-999"));
let mut res = tc.client.batch_get(req).await.unwrap();
assert_eq!(1, res.take_kvs().len());

View File

@@ -24,6 +24,7 @@ use common_error::prelude::*;
use common_telemetry::{timer, warn};
use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
TxnResponse,
};
use crate::error;
@@ -31,6 +32,8 @@ use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::service::store::kv::{KvStore, KvStoreRef};
const MAX_TXN_SIZE: usize = 128;
pub struct EtcdStore {
client: Client,
}
@@ -51,6 +54,51 @@ impl EtcdStore {
pub fn with_etcd_client(client: Client) -> Result<KvStoreRef> {
Ok(Arc::new(Self { client }))
}
async fn do_multi_txn(&self, mut txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
if txn_ops.len() < MAX_TXN_SIZE {
// fast path
let txn = Txn::new().and_then(txn_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
return Ok(vec![txn_res]);
}
let mut txns = vec![];
loop {
if txn_ops.is_empty() {
break;
}
if txn_ops.len() < MAX_TXN_SIZE {
let txn = Txn::new().and_then(txn_ops);
txns.push(txn);
break;
}
let part = txn_ops.drain(..MAX_TXN_SIZE).collect::<Vec<_>>();
let txn = Txn::new().and_then(part);
txns.push(txn);
}
let mut txn_responses = Vec::with_capacity(txns.len());
// Considering the pressure on etcd, it would be more appropriate to execute txn in
// a serial manner.
for txn in txns {
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_responses.push(txn_res);
}
Ok(txn_responses)
}
}
#[async_trait::async_trait]
@@ -142,23 +190,19 @@ impl KvStore for EtcdStore {
.into_iter()
.map(|k| TxnOp::get(k, options.clone()))
.collect();
let txn = Txn::new().and_then(get_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let txn_responses = self.do_multi_txn(get_ops).await?;
let mut kvs = vec![];
for op_res in txn_res.op_responses() {
let get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
let get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
}
}
let header = Some(ResponseHeader::success(cluster_id));
@@ -185,24 +229,20 @@ impl KvStore for EtcdStore {
.into_iter()
.map(|kv| (TxnOp::put(kv.key, kv.value, options.clone())))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(put_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let txn_responses = self.do_multi_txn(put_ops).await?;
let mut prev_kvs = vec![];
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::from_etcd_kv(prev_kv));
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::from_etcd_kv(prev_kv));
}
}
_ => unreachable!(), // never get here
}
_ => unreachable!(), // never get here
}
}
@@ -232,28 +272,23 @@ impl KvStore for EtcdStore {
.into_iter()
.map(|k| TxnOp::delete(k, options.clone()))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(delete_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let txn_responses = self.do_multi_txn(delete_ops).await?;
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
}
_ => unreachable!(), // never get here
}
_ => unreachable!(), // never get here
}
}
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchDeleteResponse { header, prev_kvs })
}