1mod ask_leader;
16pub mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49 AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51 RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68 Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79 id: Id,
80 role: Role,
81 enable_heartbeat: bool,
82 enable_store: bool,
83 enable_procedure: bool,
84 enable_access_cluster_info: bool,
85 region_follower: Option<RegionFollowerClientRef>,
86 channel_manager: Option<ChannelManager>,
87 ddl_channel_manager: Option<ChannelManager>,
88 heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92 pub fn new(member_id: u64, role: Role) -> Self {
93 Self {
94 id: member_id,
95 role,
96 ..Default::default()
97 }
98 }
99
100 pub fn frontend_default_options() -> Self {
102 Self::new(0, Role::Frontend)
104 .enable_store()
105 .enable_heartbeat()
106 .enable_procedure()
107 .enable_access_cluster_info()
108 }
109
110 pub fn datanode_default_options(member_id: u64) -> Self {
112 Self::new(member_id, Role::Datanode)
113 .enable_store()
114 .enable_heartbeat()
115 }
116
117 pub fn flownode_default_options(member_id: u64) -> Self {
119 Self::new(member_id, Role::Flownode)
120 .enable_store()
121 .enable_heartbeat()
122 .enable_procedure()
123 .enable_access_cluster_info()
124 }
125
126 pub fn enable_heartbeat(self) -> Self {
127 Self {
128 enable_heartbeat: true,
129 ..self
130 }
131 }
132
133 pub fn enable_store(self) -> Self {
134 Self {
135 enable_store: true,
136 ..self
137 }
138 }
139
140 pub fn enable_procedure(self) -> Self {
141 Self {
142 enable_procedure: true,
143 ..self
144 }
145 }
146
147 pub fn enable_access_cluster_info(self) -> Self {
148 Self {
149 enable_access_cluster_info: true,
150 ..self
151 }
152 }
153
154 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155 Self {
156 channel_manager: Some(channel_manager),
157 ..self
158 }
159 }
160
161 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162 Self {
163 ddl_channel_manager: Some(channel_manager),
164 ..self
165 }
166 }
167
168 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169 Self {
170 heartbeat_channel_manager: Some(channel_manager),
171 ..self
172 }
173 }
174
175 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176 Self {
177 region_follower: Some(region_follower),
178 ..self
179 }
180 }
181
182 pub fn build(self) -> MetaClient {
183 let mut client = if let Some(mgr) = self.channel_manager {
184 MetaClient::with_channel_manager(self.id, mgr)
185 } else {
186 MetaClient::new(self.id)
187 };
188
189 let mgr = client.channel_manager.clone();
190
191 if self.enable_heartbeat {
192 if self.heartbeat_channel_manager.is_some() {
193 info!("Enable heartbeat channel using the heartbeat channel manager.");
194 }
195 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
196 client.heartbeat = Some(HeartbeatClient::new(
197 self.id,
198 self.role,
199 mgr,
200 DEFAULT_ASK_LEADER_MAX_RETRY,
201 ));
202 }
203
204 if self.enable_store {
205 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
206 }
207
208 if self.enable_procedure {
209 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
210 client.procedure = Some(ProcedureClient::new(
211 self.id,
212 self.role,
213 mgr,
214 DEFAULT_SUBMIT_DDL_MAX_RETRY,
215 ));
216 }
217
218 if self.enable_access_cluster_info {
219 client.cluster = Some(ClusterClient::new(
220 self.id,
221 self.role,
222 mgr,
223 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
224 ))
225 }
226
227 if let Some(region_follower) = self.region_follower {
228 client.region_follower = Some(region_follower);
229 }
230
231 client
232 }
233}
234
235#[derive(Debug, Default)]
236pub struct MetaClient {
237 id: Id,
238 channel_manager: ChannelManager,
239 heartbeat: Option<HeartbeatClient>,
240 store: Option<StoreClient>,
241 procedure: Option<ProcedureClient>,
242 cluster: Option<ClusterClient>,
243 region_follower: Option<RegionFollowerClientRef>,
244}
245
246pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
247
248#[async_trait::async_trait]
250pub trait RegionFollowerClient: Sync + Send + Debug {
251 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
252
253 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
254
255 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
256
257 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
258
259 async fn start(&self, urls: &[&str]) -> Result<()>;
260
261 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl ProcedureExecutor for MetaClient {
266 async fn submit_ddl_task(
267 &self,
268 _ctx: &ExecutorContext,
269 request: SubmitDdlTaskRequest,
270 ) -> MetaResult<SubmitDdlTaskResponse> {
271 self.submit_ddl_task(request)
272 .await
273 .map_err(BoxedError::new)
274 .context(meta_error::ExternalSnafu)
275 }
276
277 async fn migrate_region(
278 &self,
279 _ctx: &ExecutorContext,
280 request: MigrateRegionRequest,
281 ) -> MetaResult<MigrateRegionResponse> {
282 self.migrate_region(request)
283 .await
284 .map_err(BoxedError::new)
285 .context(meta_error::ExternalSnafu)
286 }
287
288 async fn reconcile(
289 &self,
290 _ctx: &ExecutorContext,
291 request: ReconcileRequest,
292 ) -> MetaResult<ReconcileResponse> {
293 self.reconcile(request)
294 .await
295 .map_err(BoxedError::new)
296 .context(meta_error::ExternalSnafu)
297 }
298
299 async fn manage_region_follower(
300 &self,
301 _ctx: &ExecutorContext,
302 request: ManageRegionFollowerRequest,
303 ) -> MetaResult<()> {
304 if let Some(region_follower) = &self.region_follower {
305 match request {
306 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
307 region_follower
308 .add_region_follower(add_region_follower_request)
309 .await
310 }
311 ManageRegionFollowerRequest::RemoveRegionFollower(
312 remove_region_follower_request,
313 ) => {
314 region_follower
315 .remove_region_follower(remove_region_follower_request)
316 .await
317 }
318 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
319 region_follower
320 .add_table_follower(add_table_follower_request)
321 .await
322 }
323 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
324 region_follower
325 .remove_table_follower(remove_table_follower_request)
326 .await
327 }
328 }
329 .map_err(BoxedError::new)
330 .context(meta_error::ExternalSnafu)
331 } else {
332 UnsupportedSnafu {
333 operation: "manage_region_follower",
334 }
335 .fail()
336 }
337 }
338
339 async fn query_procedure_state(
340 &self,
341 _ctx: &ExecutorContext,
342 pid: &str,
343 ) -> MetaResult<ProcedureStateResponse> {
344 self.query_procedure_state(pid)
345 .await
346 .map_err(BoxedError::new)
347 .context(meta_error::ExternalSnafu)
348 }
349
350 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
351 self.procedure_client()
352 .map_err(BoxedError::new)
353 .context(meta_error::ExternalSnafu)?
354 .list_procedures()
355 .await
356 .map_err(BoxedError::new)
357 .context(meta_error::ExternalSnafu)
358 }
359}
360
361#[allow(deprecated)]
363#[async_trait::async_trait]
364impl ClusterInfo for MetaClient {
365 type Error = Error;
366
367 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
368 let cluster_client = self.cluster_client()?;
369
370 let (get_metasrv_nodes, nodes_key_prefix) = match role {
371 None => (true, Some(NodeInfoKey::key_prefix())),
372 Some(ClusterRole::Metasrv) => (true, None),
373 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
374 };
375
376 let mut nodes = if get_metasrv_nodes {
377 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
380 cluster_client.get_metasrv_peers().await?;
381 followers
382 .into_iter()
383 .map(|node| {
384 if let Some(node_info) = node.info {
385 NodeInfo {
386 peer: node.peer.unwrap_or_default(),
387 last_activity_ts,
388 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
389 version: node_info.version,
390 git_commit: node_info.git_commit,
391 start_time_ms: node_info.start_time_ms,
392 total_cpu_millicores: node_info.total_cpu_millicores,
393 total_memory_bytes: node_info.total_memory_bytes,
394 cpu_usage_millicores: node_info.cpu_usage_millicores,
395 memory_usage_bytes: node_info.memory_usage_bytes,
396 hostname: node_info.hostname,
397 }
398 } else {
399 NodeInfo {
401 peer: node.peer.unwrap_or_default(),
402 last_activity_ts,
403 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
404 version: node.version,
405 git_commit: node.git_commit,
406 start_time_ms: node.start_time_ms,
407 total_cpu_millicores: node.cpus as i64,
408 total_memory_bytes: node.memory_bytes as i64,
409 cpu_usage_millicores: 0,
410 memory_usage_bytes: 0,
411 hostname: "".to_string(),
412 }
413 }
414 })
415 .chain(leader.into_iter().map(|node| {
416 if let Some(node_info) = node.info {
417 NodeInfo {
418 peer: node.peer.unwrap_or_default(),
419 last_activity_ts,
420 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
421 version: node_info.version,
422 git_commit: node_info.git_commit,
423 start_time_ms: node_info.start_time_ms,
424 total_cpu_millicores: node_info.total_cpu_millicores,
425 total_memory_bytes: node_info.total_memory_bytes,
426 cpu_usage_millicores: node_info.cpu_usage_millicores,
427 memory_usage_bytes: node_info.memory_usage_bytes,
428 hostname: node_info.hostname,
429 }
430 } else {
431 NodeInfo {
433 peer: node.peer.unwrap_or_default(),
434 last_activity_ts,
435 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
436 version: node.version,
437 git_commit: node.git_commit,
438 start_time_ms: node.start_time_ms,
439 total_cpu_millicores: node.cpus as i64,
440 total_memory_bytes: node.memory_bytes as i64,
441 cpu_usage_millicores: 0,
442 memory_usage_bytes: 0,
443 hostname: "".to_string(),
444 }
445 }
446 }))
447 .collect::<Vec<_>>()
448 } else {
449 Vec::new()
450 };
451
452 if let Some(prefix) = nodes_key_prefix {
453 let req = RangeRequest::new().with_prefix(prefix);
454 let res = cluster_client.range(req).await?;
455 for kv in res.kvs {
456 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
457 }
458 }
459
460 Ok(nodes)
461 }
462
463 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
464 let cluster_kv_backend = Arc::new(self.cluster_client()?);
465 let range_prefix = DatanodeStatKey::prefix_key();
466 let req = RangeRequest::new().with_prefix(range_prefix);
467 let stream =
468 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
469 let mut datanode_stats = stream
470 .try_collect::<Vec<_>>()
471 .await
472 .context(ConvertMetaResponseSnafu)?;
473 let region_stats = datanode_stats
474 .iter_mut()
475 .flat_map(|datanode_stat| {
476 let last = datanode_stat.stats.pop();
477 last.map(|stat| stat.region_stats).unwrap_or_default()
478 })
479 .collect::<Vec<_>>();
480
481 Ok(region_stats)
482 }
483
484 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
485 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
486 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
487 let flow_state_manager = FlowStateManager::new(cluster_backend);
488 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
489
490 Ok(res.map(|r| r.into()))
491 }
492}
493
494fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
495 DatanodeStatValue::try_from(kv.value)
496 .map_err(BoxedError::new)
497 .context(ExternalSnafu)
498}
499
500impl MetaClient {
501 pub fn new(id: Id) -> Self {
502 Self {
503 id,
504 ..Default::default()
505 }
506 }
507
508 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
509 Self {
510 id,
511 channel_manager,
512 ..Default::default()
513 }
514 }
515
516 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
517 where
518 U: AsRef<str>,
519 A: AsRef<[U]> + Clone,
520 {
521 info!("MetaClient channel config: {:?}", self.channel_config());
522
523 if let Some(client) = &mut self.region_follower {
524 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
525 client.start(&urls).await?;
526 info!("Region follower client started");
527 }
528 if let Some(client) = &mut self.heartbeat {
529 client.start(urls.clone()).await?;
530 info!("Heartbeat client started");
531 }
532 if let Some(client) = &mut self.store {
533 client.start(urls.clone()).await?;
534 info!("Store client started");
535 }
536 if let Some(client) = &mut self.procedure {
537 client.start(urls.clone()).await?;
538 info!("DDL client started");
539 }
540 if let Some(client) = &mut self.cluster {
541 client.start(urls).await?;
542 info!("Cluster client started");
543 }
544
545 Ok(())
546 }
547
548 pub(crate) async fn start_with<U, A>(
550 &mut self,
551 leader_provider: LeaderProviderRef,
552 peers: A,
553 ) -> Result<()>
554 where
555 U: AsRef<str>,
556 A: AsRef<[U]> + Clone,
557 {
558 if let Some(client) = &self.region_follower {
559 info!("Starting region follower client ...");
560 client.start_with(leader_provider.clone()).await?;
561 }
562
563 if let Some(client) = &self.heartbeat {
564 info!("Starting heartbeat client ...");
565 client.start_with(leader_provider.clone()).await?;
566 }
567
568 if let Some(client) = &mut self.store {
569 info!("Starting store client ...");
570 client.start(peers.clone()).await?;
571 }
572
573 if let Some(client) = &self.procedure {
574 info!("Starting procedure client ...");
575 client.start_with(leader_provider.clone()).await?;
576 }
577
578 if let Some(client) = &mut self.cluster {
579 info!("Starting cluster client ...");
580 client.start_with(leader_provider).await?;
581 }
582 Ok(())
583 }
584
585 pub async fn ask_leader(&self) -> Result<String> {
588 self.heartbeat_client()?.ask_leader().await
589 }
590
591 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
600 self.heartbeat_client()?.heartbeat().await
601 }
602
603 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
605 self.store_client()?
606 .range(req.into())
607 .await?
608 .try_into()
609 .context(ConvertMetaResponseSnafu)
610 }
611
612 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
614 self.store_client()?
615 .put(req.into())
616 .await?
617 .try_into()
618 .context(ConvertMetaResponseSnafu)
619 }
620
621 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
623 self.store_client()?
624 .batch_get(req.into())
625 .await?
626 .try_into()
627 .context(ConvertMetaResponseSnafu)
628 }
629
630 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
632 self.store_client()?
633 .batch_put(req.into())
634 .await?
635 .try_into()
636 .context(ConvertMetaResponseSnafu)
637 }
638
639 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
641 self.store_client()?
642 .batch_delete(req.into())
643 .await?
644 .try_into()
645 .context(ConvertMetaResponseSnafu)
646 }
647
648 pub async fn compare_and_put(
651 &self,
652 req: CompareAndPutRequest,
653 ) -> Result<CompareAndPutResponse> {
654 self.store_client()?
655 .compare_and_put(req.into())
656 .await?
657 .try_into()
658 .context(ConvertMetaResponseSnafu)
659 }
660
661 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
663 self.store_client()?
664 .delete_range(req.into())
665 .await?
666 .try_into()
667 .context(ConvertMetaResponseSnafu)
668 }
669
670 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
672 self.procedure_client()?.query_procedure_state(pid).await
673 }
674
675 pub async fn migrate_region(
677 &self,
678 request: MigrateRegionRequest,
679 ) -> Result<MigrateRegionResponse> {
680 self.procedure_client()?
681 .migrate_region(
682 request.region_id,
683 request.from_peer,
684 request.to_peer,
685 request.timeout,
686 )
687 .await
688 }
689
690 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
692 self.procedure_client()?.reconcile(request).await
693 }
694
695 pub async fn submit_ddl_task(
697 &self,
698 req: SubmitDdlTaskRequest,
699 ) -> Result<SubmitDdlTaskResponse> {
700 let res = self
701 .procedure_client()?
702 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
703 .await?
704 .try_into()
705 .context(ConvertMetaResponseSnafu)?;
706
707 Ok(res)
708 }
709
710 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
711 self.heartbeat.clone().context(NotStartedSnafu {
712 name: "heartbeat_client",
713 })
714 }
715
716 pub fn store_client(&self) -> Result<StoreClient> {
717 self.store.clone().context(NotStartedSnafu {
718 name: "store_client",
719 })
720 }
721
722 pub fn procedure_client(&self) -> Result<ProcedureClient> {
723 self.procedure.clone().context(NotStartedSnafu {
724 name: "procedure_client",
725 })
726 }
727
728 pub fn cluster_client(&self) -> Result<ClusterClient> {
729 self.cluster.clone().context(NotStartedSnafu {
730 name: "cluster_client",
731 })
732 }
733
734 pub fn channel_config(&self) -> &ChannelConfig {
735 self.channel_manager.config()
736 }
737
738 pub fn id(&self) -> Id {
739 self.id
740 }
741}
742
743#[cfg(test)]
744mod tests {
745 use std::sync::atomic::{AtomicUsize, Ordering};
746
747 use api::v1::meta::{HeartbeatRequest, Peer};
748 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
749 use rand::Rng;
750
751 use super::*;
752 use crate::error;
753 use crate::mocks::{self, MockMetaContext};
754
755 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
756
757 struct TestClient {
758 ns: String,
759 client: MetaClient,
760 meta_ctx: MockMetaContext,
761 }
762
763 impl TestClient {
764 async fn new(ns: impl Into<String>) -> Self {
765 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
767 Self {
768 ns: ns.into(),
769 client,
770 meta_ctx,
771 }
772 }
773
774 fn key(&self, name: &str) -> Vec<u8> {
775 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
776 }
777
778 async fn gen_data(&self) {
779 for i in 0..10 {
780 let req = PutRequest::new()
781 .with_key(self.key(&format!("key-{i}")))
782 .with_value(format!("{}-{}", "value", i).into_bytes())
783 .with_prev_kv();
784 let res = self.client.put(req).await;
785 let _ = res.unwrap();
786 }
787 }
788
789 async fn clear_data(&self) {
790 let req =
791 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
792 let res = self.client.delete_range(req).await;
793 let _ = res.unwrap();
794 }
795
796 #[allow(dead_code)]
797 fn kv_backend(&self) -> KvBackendRef {
798 self.meta_ctx.kv_backend.clone()
799 }
800
801 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
802 self.meta_ctx.in_memory.clone()
803 }
804 }
805
806 async fn new_client(ns: impl Into<String>) -> TestClient {
807 let client = TestClient::new(ns).await;
808 client.clear_data().await;
809 client
810 }
811
812 #[tokio::test]
813 async fn test_meta_client_builder() {
814 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
815
816 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
817 .enable_heartbeat()
818 .build();
819 let _ = meta_client.heartbeat_client().unwrap();
820 assert!(meta_client.store_client().is_err());
821 meta_client.start(urls).await.unwrap();
822
823 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
824 assert!(meta_client.heartbeat_client().is_err());
825 assert!(meta_client.store_client().is_err());
826 meta_client.start(urls).await.unwrap();
827
828 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
829 .enable_store()
830 .build();
831 assert!(meta_client.heartbeat_client().is_err());
832 let _ = meta_client.store_client().unwrap();
833 meta_client.start(urls).await.unwrap();
834
835 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
836 .enable_heartbeat()
837 .enable_store()
838 .build();
839 assert_eq!(2, meta_client.id());
840 assert_eq!(2, meta_client.id());
841 let _ = meta_client.heartbeat_client().unwrap();
842 let _ = meta_client.store_client().unwrap();
843 meta_client.start(urls).await.unwrap();
844 }
845
846 #[tokio::test]
847 async fn test_not_start_heartbeat_client() {
848 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
849 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
850 .enable_store()
851 .build();
852 meta_client.start(urls).await.unwrap();
853 let res = meta_client.ask_leader().await;
854 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
855 }
856
857 #[tokio::test]
858 async fn test_not_start_store_client() {
859 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
860 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
861 .enable_heartbeat()
862 .build();
863
864 meta_client.start(urls).await.unwrap();
865 let res = meta_client.put(PutRequest::default()).await;
866 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
867 }
868
869 #[tokio::test]
870 async fn test_ask_leader() {
871 let tc = new_client("test_ask_leader").await;
872 tc.client.ask_leader().await.unwrap();
873 }
874
875 #[tokio::test]
876 async fn test_heartbeat() {
877 let tc = new_client("test_heartbeat").await;
878 let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
879 let request_sent = Arc::new(AtomicUsize::new(0));
882 let request_sent_clone = request_sent.clone();
883 let _handle = tokio::spawn(async move {
884 for _ in 0..5 {
885 let req = HeartbeatRequest {
886 peer: Some(Peer {
887 id: 1,
888 addr: "meta_client_peer".to_string(),
889 }),
890 ..Default::default()
891 };
892 sender.send(req).await.unwrap();
893 request_sent_clone.fetch_add(1, Ordering::Relaxed);
894 }
895 });
896
897 let heartbeat_count = Arc::new(AtomicUsize::new(0));
898 let heartbeat_count_clone = heartbeat_count.clone();
899 let handle = tokio::spawn(async move {
900 while let Some(_resp) = receiver.message().await.unwrap() {
901 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
902 }
903 });
904
905 handle.await.unwrap();
906 assert_eq!(
908 request_sent.load(Ordering::Relaxed) + 1,
909 heartbeat_count.load(Ordering::Relaxed)
910 );
911 }
912
913 #[tokio::test]
914 async fn test_range_get() {
915 let tc = new_client("test_range_get").await;
916 tc.gen_data().await;
917
918 let key = tc.key("key-0");
919 let req = RangeRequest::new().with_key(key.as_slice());
920 let res = tc.client.range(req).await;
921 let mut kvs = res.unwrap().take_kvs();
922 assert_eq!(1, kvs.len());
923 let mut kv = kvs.pop().unwrap();
924 assert_eq!(key, kv.take_key());
925 assert_eq!(b"value-0".to_vec(), kv.take_value());
926 }
927
928 #[tokio::test]
929 async fn test_range_get_prefix() {
930 let tc = new_client("test_range_get_prefix").await;
931 tc.gen_data().await;
932
933 let req = RangeRequest::new().with_prefix(tc.key("key-"));
934 let res = tc.client.range(req).await;
935 let kvs = res.unwrap().take_kvs();
936 assert_eq!(10, kvs.len());
937 for (i, mut kv) in kvs.into_iter().enumerate() {
938 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
939 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
940 }
941 }
942
943 #[tokio::test]
944 async fn test_range() {
945 let tc = new_client("test_range").await;
946 tc.gen_data().await;
947
948 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
949 let res = tc.client.range(req).await;
950 let kvs = res.unwrap().take_kvs();
951 assert_eq!(3, kvs.len());
952 for (i, mut kv) in kvs.into_iter().enumerate() {
953 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
954 assert_eq!(
955 format!("{}-{}", "value", i + 5).into_bytes(),
956 kv.take_value()
957 );
958 }
959 }
960
961 #[tokio::test]
962 async fn test_range_keys_only() {
963 let tc = new_client("test_range_keys_only").await;
964 tc.gen_data().await;
965
966 let req = RangeRequest::new()
967 .with_range(tc.key("key-5"), tc.key("key-8"))
968 .with_keys_only();
969 let res = tc.client.range(req).await;
970 let kvs = res.unwrap().take_kvs();
971 assert_eq!(3, kvs.len());
972 for (i, mut kv) in kvs.into_iter().enumerate() {
973 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
974 assert!(kv.take_value().is_empty());
975 }
976 }
977
978 #[tokio::test]
979 async fn test_put() {
980 let tc = new_client("test_put").await;
981
982 let req = PutRequest::new()
983 .with_key(tc.key("key"))
984 .with_value(b"value".to_vec());
985 let res = tc.client.put(req).await;
986 assert!(res.unwrap().prev_kv.is_none());
987 }
988
989 #[tokio::test]
990 async fn test_put_with_prev_kv() {
991 let tc = new_client("test_put_with_prev_kv").await;
992
993 let key = tc.key("key");
994 let req = PutRequest::new()
995 .with_key(key.as_slice())
996 .with_value(b"value".to_vec())
997 .with_prev_kv();
998 let res = tc.client.put(req).await;
999 assert!(res.unwrap().prev_kv.is_none());
1000
1001 let req = PutRequest::new()
1002 .with_key(key.as_slice())
1003 .with_value(b"value1".to_vec())
1004 .with_prev_kv();
1005 let res = tc.client.put(req).await;
1006 let mut kv = res.unwrap().prev_kv.unwrap();
1007 assert_eq!(key, kv.take_key());
1008 assert_eq!(b"value".to_vec(), kv.take_value());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_batch_put() {
1013 let tc = new_client("test_batch_put").await;
1014
1015 let mut req = BatchPutRequest::new();
1016 for i in 0..275 {
1017 req = req.add_kv(
1018 tc.key(&format!("key-{}", i)),
1019 format!("value-{}", i).into_bytes(),
1020 );
1021 }
1022
1023 let res = tc.client.batch_put(req).await;
1024 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1025
1026 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1027 let res = tc.client.range(req).await;
1028 let kvs = res.unwrap().take_kvs();
1029 assert_eq!(275, kvs.len());
1030 }
1031
1032 #[tokio::test]
1033 async fn test_batch_get() {
1034 let tc = new_client("test_batch_get").await;
1035 tc.gen_data().await;
1036
1037 let mut req = BatchGetRequest::default();
1038 for i in 0..256 {
1039 req = req.add_key(tc.key(&format!("key-{}", i)));
1040 }
1041 let res = tc.client.batch_get(req).await.unwrap();
1042 assert_eq!(10, res.kvs.len());
1043
1044 let req = BatchGetRequest::default()
1045 .add_key(tc.key("key-1"))
1046 .add_key(tc.key("key-999"));
1047 let res = tc.client.batch_get(req).await.unwrap();
1048 assert_eq!(1, res.kvs.len());
1049 }
1050
1051 #[tokio::test]
1052 async fn test_batch_put_with_prev_kv() {
1053 let tc = new_client("test_batch_put_with_prev_kv").await;
1054
1055 let key = tc.key("key");
1056 let key2 = tc.key("key2");
1057 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1058 let res = tc.client.batch_put(req).await;
1059 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1060
1061 let req = BatchPutRequest::new()
1062 .add_kv(key.as_slice(), b"value-".to_vec())
1063 .add_kv(key2.as_slice(), b"value2-".to_vec())
1064 .with_prev_kv();
1065 let res = tc.client.batch_put(req).await;
1066 let mut kvs = res.unwrap().take_prev_kvs();
1067 assert_eq!(1, kvs.len());
1068 let mut kv = kvs.pop().unwrap();
1069 assert_eq!(key, kv.take_key());
1070 assert_eq!(b"value".to_vec(), kv.take_value());
1071 }
1072
1073 #[tokio::test]
1074 async fn test_compare_and_put() {
1075 let tc = new_client("test_compare_and_put").await;
1076
1077 let key = tc.key("key");
1078 let req = CompareAndPutRequest::new()
1079 .with_key(key.as_slice())
1080 .with_expect(b"expect".to_vec())
1081 .with_value(b"value".to_vec());
1082 let res = tc.client.compare_and_put(req).await;
1083 assert!(!res.unwrap().is_success());
1084
1085 let req = CompareAndPutRequest::new()
1087 .with_key(key.as_slice())
1088 .with_value(b"value".to_vec());
1089 let res = tc.client.compare_and_put(req).await;
1090 let mut res = res.unwrap();
1091 assert!(res.is_success());
1092 assert!(res.take_prev_kv().is_none());
1093
1094 let req = CompareAndPutRequest::new()
1096 .with_key(key.as_slice())
1097 .with_expect(b"not_eq".to_vec())
1098 .with_value(b"value2".to_vec());
1099 let res = tc.client.compare_and_put(req).await;
1100 let mut res = res.unwrap();
1101 assert!(!res.is_success());
1102 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1103
1104 let req = CompareAndPutRequest::new()
1106 .with_key(key.as_slice())
1107 .with_expect(b"value".to_vec())
1108 .with_value(b"value2".to_vec());
1109 let res = tc.client.compare_and_put(req).await;
1110 let mut res = res.unwrap();
1111 assert!(res.is_success());
1112
1113 assert!(res.take_prev_kv().is_none());
1115 }
1116
1117 #[tokio::test]
1118 async fn test_delete_with_key() {
1119 let tc = new_client("test_delete_with_key").await;
1120 tc.gen_data().await;
1121
1122 let req = DeleteRangeRequest::new()
1123 .with_key(tc.key("key-0"))
1124 .with_prev_kv();
1125 let res = tc.client.delete_range(req).await;
1126 let mut res = res.unwrap();
1127 assert_eq!(1, res.deleted());
1128 let mut kvs = res.take_prev_kvs();
1129 assert_eq!(1, kvs.len());
1130 let mut kv = kvs.pop().unwrap();
1131 assert_eq!(b"value-0".to_vec(), kv.take_value());
1132 }
1133
1134 #[tokio::test]
1135 async fn test_delete_with_prefix() {
1136 let tc = new_client("test_delete_with_prefix").await;
1137 tc.gen_data().await;
1138
1139 let req = DeleteRangeRequest::new()
1140 .with_prefix(tc.key("key-"))
1141 .with_prev_kv();
1142 let res = tc.client.delete_range(req).await;
1143 let mut res = res.unwrap();
1144 assert_eq!(10, res.deleted());
1145 let kvs = res.take_prev_kvs();
1146 assert_eq!(10, kvs.len());
1147 for (i, mut kv) in kvs.into_iter().enumerate() {
1148 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn test_delete_with_range() {
1154 let tc = new_client("test_delete_with_range").await;
1155 tc.gen_data().await;
1156
1157 let req = DeleteRangeRequest::new()
1158 .with_range(tc.key("key-2"), tc.key("key-7"))
1159 .with_prev_kv();
1160 let res = tc.client.delete_range(req).await;
1161 let mut res = res.unwrap();
1162 assert_eq!(5, res.deleted());
1163 let kvs = res.take_prev_kvs();
1164 assert_eq!(5, kvs.len());
1165 for (i, mut kv) in kvs.into_iter().enumerate() {
1166 assert_eq!(
1167 format!("{}-{}", "value", i + 2).into_bytes(),
1168 kv.take_value()
1169 );
1170 }
1171 }
1172
1173 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1174 Ok(())
1175 }
1176
1177 #[tokio::test]
1178 async fn test_cluster_client_adaptive_range() {
1179 let tx = new_client("test_cluster_client").await;
1180 let in_memory = tx.in_memory().unwrap();
1181 let cluster_client = tx.client.cluster_client().unwrap();
1182 let mut rng = rand::rng();
1183
1184 for i in 0..10 {
1186 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1187 in_memory
1188 .put(
1189 PutRequest::new()
1190 .with_key(format!("__prefix/{i}").as_bytes())
1191 .with_value(data.clone()),
1192 )
1193 .await
1194 .unwrap();
1195 }
1196
1197 let req = RangeRequest::new().with_prefix(b"__prefix/");
1198 let stream =
1199 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1200
1201 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1202 assert_eq!(10, res.len());
1203 }
1204}