diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 13d46f3de8..10a5b34e58 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -92,13 +92,13 @@ impl MetaClientBuilder { let mgr = client.channel_manager.clone(); if self.enable_heartbeat { - client.heartbeat_client = Some(HeartbeatClient::new(self.id, mgr.clone())); + client.heartbeat = Some(HeartbeatClient::new(self.id, mgr.clone())); } if self.enable_router { - client.router_client = Some(RouterClient::new(self.id, mgr.clone())); + client.router = Some(RouterClient::new(self.id, mgr.clone())); } if self.enable_store { - client.store_client = Some(StoreClient::new(self.id, mgr)); + client.store = Some(StoreClient::new(self.id, mgr)); } client @@ -109,9 +109,9 @@ impl MetaClientBuilder { pub struct MetaClient { id: Id, channel_manager: ChannelManager, - heartbeat_client: Option, - router_client: Option, - store_client: Option, + heartbeat: Option, + router: Option, + store: Option, } impl MetaClient { @@ -137,52 +137,46 @@ impl MetaClient { { info!("MetaClient channel config: {:?}", self.channel_config()); - if let Some(heartbeat_client) = &mut self.heartbeat_client { - heartbeat_client.start(urls.clone()).await?; + if let Some(client) = &mut self.heartbeat { + client.start(urls.clone()).await?; info!("Heartbeat client started"); } - if let Some(router_client) = &mut self.router_client { - router_client.start(urls.clone()).await?; + if let Some(client) = &mut self.router { + client.start(urls.clone()).await?; info!("Router client started"); } - if let Some(store_client) = &mut self.store_client { - store_client.start(urls).await?; + if let Some(client) = &mut self.store { + client.start(urls).await?; info!("Store client started"); } Ok(()) } + /// Ask the leader address of `metasrv`, and the heartbeat component + /// needs to create a bidirectional streaming to the leader. pub async fn ask_leader(&self) -> Result<()> { - self.heartbeat_client() - .context(error::NotStartedSnafu { - name: "heartbeat_client", - })? - .ask_leader() - .await - } - - pub async fn refresh_members(&mut self) { - todo!() + self.heartbeat_client()?.ask_leader().await } + /// Returns a heartbeat bidirectional streaming: (sender, recever), the + /// other end is the leader of `metasrv`. + /// + /// The `datanode` needs to use the sender to continuously send heartbeat + /// packets (some self-state data), and the receiver can receive a response + /// from "metasrv" (which may contain some scheduling instructions). pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { - self.heartbeat_client() - .context(error::NotStartedSnafu { - name: "heartbeat_client", - })? - .heartbeat() - .await + self.heartbeat_client()?.heartbeat().await } + /// Provides routing information for distributed create table requests. + /// + /// When a distributed create table request is received, this method returns + /// a list of `datanode` addresses that are generated based on the partition + /// information contained in the request and using some intelligent policies, + /// such as load-based. pub async fn create_route(&self, req: CreateRequest) -> Result { - self.router_client() - .context(error::NotStartedSnafu { - name: "route_client", - })? - .create(req.into()) - .await? - .try_into() + self.router_client()?.create(req.into()).await?.try_into() } /// Fetch routing information for tables. The smallest unit is the complete @@ -205,46 +199,22 @@ impl MetaClient { /// ``` /// pub async fn route(&self, req: RouteRequest) -> Result { - self.router_client() - .context(error::NotStartedSnafu { - name: "route_client", - })? - .route(req.into()) - .await? - .try_into() + self.router_client()?.route(req.into()).await?.try_into() } /// Range gets the keys in the range from the key-value store. pub async fn range(&self, req: RangeRequest) -> Result { - self.store_client() - .context(error::NotStartedSnafu { - name: "store_client", - })? - .range(req.into()) - .await? - .try_into() + self.store_client()?.range(req.into()).await?.try_into() } /// Put puts the given key into the key-value store. pub async fn put(&self, req: PutRequest) -> Result { - self.store_client() - .context(error::NotStartedSnafu { - name: "store_client", - })? - .put(req.into()) - .await? - .try_into() + self.store_client()?.put(req.into()).await?.try_into() } /// BatchPut atomically puts the given keys into the key-value store. pub async fn batch_put(&self, req: BatchPutRequest) -> Result { - self.store_client() - .context(error::NotStartedSnafu { - name: "store_client", - })? - .batch_put(req.into()) - .await? - .try_into() + self.store_client()?.batch_put(req.into()).await?.try_into() } /// CompareAndPut atomically puts the value to the given updated @@ -253,10 +223,7 @@ impl MetaClient { &self, req: CompareAndPutRequest, ) -> Result { - self.store_client() - .context(error::NotStartedSnafu { - name: "store_client", - })? + self.store_client()? .compare_and_put(req.into()) .await? .try_into() @@ -264,28 +231,31 @@ impl MetaClient { /// DeleteRange deletes the given range from the key-value store. pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result { - self.store_client() - .context(error::NotStartedSnafu { - name: "store_client", - })? + self.store_client()? .delete_range(req.into()) .await? .try_into() } #[inline] - pub fn heartbeat_client(&self) -> Option { - self.heartbeat_client.clone() + pub fn heartbeat_client(&self) -> Result { + self.heartbeat.clone().context(error::NotStartedSnafu { + name: "heartbeat_client", + }) } #[inline] - pub fn router_client(&self) -> Option { - self.router_client.clone() + pub fn router_client(&self) -> Result { + self.router.clone().context(error::NotStartedSnafu { + name: "store_client", + }) } #[inline] - pub fn store_client(&self) -> Option { - self.store_client.clone() + pub fn store_client(&self) -> Result { + self.store.clone().context(error::NotStartedSnafu { + name: "store_client", + }) } #[inline] @@ -320,23 +290,23 @@ mod tests { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build(); - assert!(meta_client.heartbeat_client().is_some()); - assert!(meta_client.router_client().is_none()); - assert!(meta_client.store_client().is_none()); + assert!(meta_client.heartbeat_client().is_ok()); + assert!(meta_client.router_client().is_err()); + assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build(); - assert!(meta_client.heartbeat_client().is_none()); - assert!(meta_client.router_client().is_some()); - assert!(meta_client.store_client().is_none()); + assert!(meta_client.heartbeat_client().is_err()); + assert!(meta_client.router_client().is_ok()); + assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); assert!(meta_client.router_client().unwrap().is_started().await); let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build(); - assert!(meta_client.heartbeat_client().is_none()); - assert!(meta_client.router_client().is_none()); - assert!(meta_client.store_client().is_some()); + assert!(meta_client.heartbeat_client().is_err()); + assert!(meta_client.router_client().is_err()); + assert!(meta_client.store_client().is_ok()); meta_client.start(urls).await.unwrap(); assert!(meta_client.store_client().unwrap().is_started().await); @@ -347,9 +317,9 @@ mod tests { .build(); assert_eq!(1, meta_client.id().0); assert_eq!(2, meta_client.id().1); - assert!(meta_client.heartbeat_client().is_some()); - assert!(meta_client.router_client().is_some()); - assert!(meta_client.store_client().is_some()); + assert!(meta_client.heartbeat_client().is_ok()); + assert!(meta_client.router_client().is_ok()); + assert!(meta_client.store_client().is_ok()); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); assert!(meta_client.router_client().unwrap().is_started().await); @@ -648,23 +618,26 @@ mod tests { let res = client.compare_and_put(req).await; assert!(!res.unwrap().is_success()); - // empty expect key is not allowed + // create if absent let req = CompareAndPutRequest::new() .with_key(b"key".to_vec()) .with_value(b"value".to_vec()); let res = client.compare_and_put(req).await; let mut res = res.unwrap(); - assert!(!res.is_success()); - let mut kv = res.take_prev_kv().unwrap(); - assert_eq!(b"key".to_vec(), kv.take_key()); - assert!(kv.take_value().is_empty()); + assert!(res.is_success()); + assert!(res.take_prev_kv().is_none()); - let req = PutRequest::new() + // compare and put fail + let req = CompareAndPutRequest::new() .with_key(b"key".to_vec()) - .with_value(b"value".to_vec()); - let res = client.put(req).await; - assert!(res.is_ok()); + .with_expect(b"not_eq".to_vec()) + .with_value(b"value2".to_vec()); + let res = client.compare_and_put(req).await; + let mut res = res.unwrap(); + assert!(!res.is_success()); + assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value()); + // compare and put success let req = CompareAndPutRequest::new() .with_key(b"key".to_vec()) .with_expect(b"value".to_vec()) diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 685474129e..fb00a35e10 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -29,7 +29,7 @@ pub struct HeartbeatSender { impl HeartbeatSender { #[inline] - const fn new(id: Id, sender: mpsc::Sender) -> Self { + fn new(id: Id, sender: mpsc::Sender) -> Self { Self { id, sender } } @@ -58,7 +58,7 @@ pub struct HeartbeatStream { impl HeartbeatStream { #[inline] - const fn new(id: Id, stream: Streaming) -> Self { + fn new(id: Id, stream: Streaming) -> Self { Self { id, stream } } diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 7753c01c57..aa71890b13 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -143,14 +143,17 @@ impl KvStore for EtcdStore { options, } = req.try_into()?; - let txn = Txn::new() - .when(vec![Compare::value( - key.clone(), - CompareOp::Equal, - expect.clone(), - )]) - .and_then(vec![TxnOp::put(key.clone(), value, options)]) - .or_else(vec![TxnOp::get(key.clone(), None)]); + let put_op = vec![TxnOp::put(key.clone(), value, options)]; + let get_op = vec![TxnOp::get(key.clone(), None)]; + let mut txn = if expect.is_empty() { + // create if absent + // revision 0 means key was not exist + Txn::new().when(vec![Compare::create_revision(key, CompareOp::Equal, 0)]) + } else { + // compare and put + Txn::new().when(vec![Compare::value(key, CompareOp::Equal, expect)]) + }; + txn = txn.and_then(put_op).or_else(get_op); let txn_res = self .client @@ -158,6 +161,7 @@ impl KvStore for EtcdStore { .txn(txn) .await .context(error::EtcdFailedSnafu)?; + let success = txn_res.succeeded(); let op_res = txn_res .op_responses() @@ -165,26 +169,26 @@ impl KvStore for EtcdStore { .context(error::InvalidTxnResultSnafu { err_msg: "empty response", })?; - let prev_kv = if success { - Some(KeyValue { key, value: expect }) - } else { - match op_res { - TxnOpResponse::Get(get_res) => { - if get_res.count() == 0 { - // do not exists - Some(KeyValue { key, value: vec![] }) - } else { - ensure!( - get_res.count() == 1, - error::InvalidTxnResultSnafu { - err_msg: format!("expect 1 response, actual {}", get_res.count()) - } - ); - Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))) - } - } - _ => unreachable!(), // never get here + + let prev_kv = match op_res { + TxnOpResponse::Put(put_res) => { + put_res.prev_key().map(|kv| KeyValue::from(KvPair::new(kv))) } + TxnOpResponse::Get(get_res) => { + if get_res.count() == 0 { + // do not exists + None + } else { + ensure!( + get_res.count() == 1, + error::InvalidTxnResultSnafu { + err_msg: format!("expect 1 response, actual {}", get_res.count()) + } + ); + Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))) + } + } + _ => unreachable!(), // never get here }; let header = Some(ResponseHeader::success(cluster_id)); diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index 52f9b96cdc..8721789105 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -145,27 +145,16 @@ impl KvStore for MemStore { } = req; let mut memory = self.inner.write(); - let (success, prev_kv) = if expect.is_empty() { - ( - false, - Some(KeyValue { - key: key.clone(), - value: vec![], - }), - ) - } else { - let prev_val = memory.get(&key); - let success = prev_val - .map(|v| expect.cmp(v) == Ordering::Equal) - .unwrap_or(false); - ( - success, - prev_val.map(|v| KeyValue { - key: key.clone(), - value: v.clone(), - }), - ) - }; + + let prev_val = memory.get(&key); + + let success = prev_val + .map(|v| expect.cmp(v) == Ordering::Equal) + .unwrap_or(false | expect.is_empty()); + let prev_kv = prev_val.map(|v| KeyValue { + key: key.clone(), + value: v.clone(), + }); if success { memory.insert(key, value);