chore: minor refactor with meta_client (#393)

* chore: minor refactor

* feat: support none expect value on CAS
This commit is contained in:
Jiachun Feng
2022-11-07 17:03:31 +08:00
committed by GitHub
parent ae147c2a74
commit 172c9a1e21
4 changed files with 114 additions and 148 deletions

View File

@@ -92,13 +92,13 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
client.heartbeat_client = Some(HeartbeatClient::new(self.id, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(self.id, mgr.clone()));
}
if self.enable_router {
client.router_client = Some(RouterClient::new(self.id, mgr.clone()));
client.router = Some(RouterClient::new(self.id, mgr.clone()));
}
if self.enable_store {
client.store_client = Some(StoreClient::new(self.id, mgr));
client.store = Some(StoreClient::new(self.id, mgr));
}
client
@@ -109,9 +109,9 @@ impl MetaClientBuilder {
pub struct MetaClient {
id: Id,
channel_manager: ChannelManager,
heartbeat_client: Option<HeartbeatClient>,
router_client: Option<RouterClient>,
store_client: Option<StoreClient>,
heartbeat: Option<HeartbeatClient>,
router: Option<RouterClient>,
store: Option<StoreClient>,
}
impl MetaClient {
@@ -137,52 +137,46 @@ impl MetaClient {
{
info!("MetaClient channel config: {:?}", self.channel_config());
if let Some(heartbeat_client) = &mut self.heartbeat_client {
heartbeat_client.start(urls.clone()).await?;
if let Some(client) = &mut self.heartbeat {
client.start(urls.clone()).await?;
info!("Heartbeat client started");
}
if let Some(router_client) = &mut self.router_client {
router_client.start(urls.clone()).await?;
if let Some(client) = &mut self.router {
client.start(urls.clone()).await?;
info!("Router client started");
}
if let Some(store_client) = &mut self.store_client {
store_client.start(urls).await?;
if let Some(client) = &mut self.store {
client.start(urls).await?;
info!("Store client started");
}
Ok(())
}
/// Ask the leader address of `metasrv`, and the heartbeat component
/// needs to create a bidirectional streaming to the leader.
pub async fn ask_leader(&self) -> Result<()> {
self.heartbeat_client()
.context(error::NotStartedSnafu {
name: "heartbeat_client",
})?
.ask_leader()
.await
}
pub async fn refresh_members(&mut self) {
todo!()
self.heartbeat_client()?.ask_leader().await
}
/// Returns a heartbeat bidirectional streaming: (sender, recever), the
/// other end is the leader of `metasrv`.
///
/// The `datanode` needs to use the sender to continuously send heartbeat
/// packets (some self-state data), and the receiver can receive a response
/// from "metasrv" (which may contain some scheduling instructions).
pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
self.heartbeat_client()
.context(error::NotStartedSnafu {
name: "heartbeat_client",
})?
.heartbeat()
.await
self.heartbeat_client()?.heartbeat().await
}
/// Provides routing information for distributed create table requests.
///
/// When a distributed create table request is received, this method returns
/// a list of `datanode` addresses that are generated based on the partition
/// information contained in the request and using some intelligent policies,
/// such as load-based.
pub async fn create_route(&self, req: CreateRequest) -> Result<RouteResponse> {
self.router_client()
.context(error::NotStartedSnafu {
name: "route_client",
})?
.create(req.into())
.await?
.try_into()
self.router_client()?.create(req.into()).await?.try_into()
}
/// Fetch routing information for tables. The smallest unit is the complete
@@ -205,46 +199,22 @@ impl MetaClient {
/// ```
///
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
self.router_client()
.context(error::NotStartedSnafu {
name: "route_client",
})?
.route(req.into())
.await?
.try_into()
self.router_client()?.route(req.into()).await?.try_into()
}
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.range(req.into())
.await?
.try_into()
self.store_client()?.range(req.into()).await?.try_into()
}
/// Put puts the given key into the key-value store.
pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.put(req.into())
.await?
.try_into()
self.store_client()?.put(req.into()).await?.try_into()
}
/// BatchPut atomically puts the given keys into the key-value store.
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.batch_put(req.into())
.await?
.try_into()
self.store_client()?.batch_put(req.into()).await?.try_into()
}
/// CompareAndPut atomically puts the value to the given updated
@@ -253,10 +223,7 @@ impl MetaClient {
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
self.store_client()?
.compare_and_put(req.into())
.await?
.try_into()
@@ -264,28 +231,31 @@ impl MetaClient {
/// DeleteRange deletes the given range from the key-value store.
pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
self.store_client()?
.delete_range(req.into())
.await?
.try_into()
}
#[inline]
pub fn heartbeat_client(&self) -> Option<HeartbeatClient> {
self.heartbeat_client.clone()
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
name: "heartbeat_client",
})
}
#[inline]
pub fn router_client(&self) -> Option<RouterClient> {
self.router_client.clone()
pub fn router_client(&self) -> Result<RouterClient> {
self.router.clone().context(error::NotStartedSnafu {
name: "store_client",
})
}
#[inline]
pub fn store_client(&self) -> Option<StoreClient> {
self.store_client.clone()
pub fn store_client(&self) -> Result<StoreClient> {
self.store.clone().context(error::NotStartedSnafu {
name: "store_client",
})
}
#[inline]
@@ -320,23 +290,23 @@ mod tests {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build();
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_none());
assert!(meta_client.heartbeat_client().is_ok());
assert!(meta_client.router_client().is_err());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_none());
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.router_client().is_ok());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.router_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build();
assert!(meta_client.heartbeat_client().is_none());
assert!(meta_client.router_client().is_none());
assert!(meta_client.store_client().is_some());
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.router_client().is_err());
assert!(meta_client.store_client().is_ok());
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
@@ -347,9 +317,9 @@ mod tests {
.build();
assert_eq!(1, meta_client.id().0);
assert_eq!(2, meta_client.id().1);
assert!(meta_client.heartbeat_client().is_some());
assert!(meta_client.router_client().is_some());
assert!(meta_client.store_client().is_some());
assert!(meta_client.heartbeat_client().is_ok());
assert!(meta_client.router_client().is_ok());
assert!(meta_client.store_client().is_ok());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
assert!(meta_client.router_client().unwrap().is_started().await);
@@ -648,23 +618,26 @@ mod tests {
let res = client.compare_and_put(req).await;
assert!(!res.unwrap().is_success());
// empty expect key is not allowed
// create if absent
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec());
let res = client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(!res.is_success());
let mut kv = res.take_prev_kv().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert!(kv.take_value().is_empty());
assert!(res.is_success());
assert!(res.take_prev_kv().is_none());
let req = PutRequest::new()
// compare and put fail
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec());
let res = client.put(req).await;
assert!(res.is_ok());
.with_expect(b"not_eq".to_vec())
.with_value(b"value2".to_vec());
let res = client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(!res.is_success());
assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
// compare and put success
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_expect(b"value".to_vec())

View File

@@ -29,7 +29,7 @@ pub struct HeartbeatSender {
impl HeartbeatSender {
#[inline]
const fn new(id: Id, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
fn new(id: Id, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { id, sender }
}
@@ -58,7 +58,7 @@ pub struct HeartbeatStream {
impl HeartbeatStream {
#[inline]
const fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
fn new(id: Id, stream: Streaming<HeartbeatResponse>) -> Self {
Self { id, stream }
}

View File

@@ -143,14 +143,17 @@ impl KvStore for EtcdStore {
options,
} = req.try_into()?;
let txn = Txn::new()
.when(vec![Compare::value(
key.clone(),
CompareOp::Equal,
expect.clone(),
)])
.and_then(vec![TxnOp::put(key.clone(), value, options)])
.or_else(vec![TxnOp::get(key.clone(), None)]);
let put_op = vec![TxnOp::put(key.clone(), value, options)];
let get_op = vec![TxnOp::get(key.clone(), None)];
let mut txn = if expect.is_empty() {
// create if absent
// revision 0 means key was not exist
Txn::new().when(vec![Compare::create_revision(key, CompareOp::Equal, 0)])
} else {
// compare and put
Txn::new().when(vec![Compare::value(key, CompareOp::Equal, expect)])
};
txn = txn.and_then(put_op).or_else(get_op);
let txn_res = self
.client
@@ -158,6 +161,7 @@ impl KvStore for EtcdStore {
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let success = txn_res.succeeded();
let op_res = txn_res
.op_responses()
@@ -165,26 +169,26 @@ impl KvStore for EtcdStore {
.context(error::InvalidTxnResultSnafu {
err_msg: "empty response",
})?;
let prev_kv = if success {
Some(KeyValue { key, value: expect })
} else {
match op_res {
TxnOpResponse::Get(get_res) => {
if get_res.count() == 0 {
// do not exists
Some(KeyValue { key, value: vec![] })
} else {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])))
}
}
_ => unreachable!(), // never get here
let prev_kv = match op_res {
TxnOpResponse::Put(put_res) => {
put_res.prev_key().map(|kv| KeyValue::from(KvPair::new(kv)))
}
TxnOpResponse::Get(get_res) => {
if get_res.count() == 0 {
// do not exists
None
} else {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])))
}
}
_ => unreachable!(), // never get here
};
let header = Some(ResponseHeader::success(cluster_id));

View File

@@ -145,27 +145,16 @@ impl KvStore for MemStore {
} = req;
let mut memory = self.inner.write();
let (success, prev_kv) = if expect.is_empty() {
(
false,
Some(KeyValue {
key: key.clone(),
value: vec![],
}),
)
} else {
let prev_val = memory.get(&key);
let success = prev_val
.map(|v| expect.cmp(v) == Ordering::Equal)
.unwrap_or(false);
(
success,
prev_val.map(|v| KeyValue {
key: key.clone(),
value: v.clone(),
}),
)
};
let prev_val = memory.get(&key);
let success = prev_val
.map(|v| expect.cmp(v) == Ordering::Equal)
.unwrap_or(false | expect.is_empty());
let prev_kv = prev_val.map(|v| KeyValue {
key: key.clone(),
value: v.clone(),
});
if success {
memory.insert(key, value);