meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16pub mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49    AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51    RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68    Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79    id: Id,
80    role: Role,
81    enable_heartbeat: bool,
82    enable_store: bool,
83    enable_procedure: bool,
84    enable_access_cluster_info: bool,
85    region_follower: Option<RegionFollowerClientRef>,
86    channel_manager: Option<ChannelManager>,
87    ddl_channel_manager: Option<ChannelManager>,
88    heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92    pub fn new(member_id: u64, role: Role) -> Self {
93        Self {
94            id: member_id,
95            role,
96            ..Default::default()
97        }
98    }
99
100    /// Returns the role of Frontend's default options.
101    pub fn frontend_default_options() -> Self {
102        // Frontend does not need a member id.
103        Self::new(0, Role::Frontend)
104            .enable_store()
105            .enable_heartbeat()
106            .enable_procedure()
107            .enable_access_cluster_info()
108    }
109
110    /// Returns the role of Datanode's default options.
111    pub fn datanode_default_options(member_id: u64) -> Self {
112        Self::new(member_id, Role::Datanode)
113            .enable_store()
114            .enable_heartbeat()
115    }
116
117    /// Returns the role of Flownode's default options.
118    pub fn flownode_default_options(member_id: u64) -> Self {
119        Self::new(member_id, Role::Flownode)
120            .enable_store()
121            .enable_heartbeat()
122            .enable_procedure()
123            .enable_access_cluster_info()
124    }
125
126    pub fn enable_heartbeat(self) -> Self {
127        Self {
128            enable_heartbeat: true,
129            ..self
130        }
131    }
132
133    pub fn enable_store(self) -> Self {
134        Self {
135            enable_store: true,
136            ..self
137        }
138    }
139
140    pub fn enable_procedure(self) -> Self {
141        Self {
142            enable_procedure: true,
143            ..self
144        }
145    }
146
147    pub fn enable_access_cluster_info(self) -> Self {
148        Self {
149            enable_access_cluster_info: true,
150            ..self
151        }
152    }
153
154    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155        Self {
156            channel_manager: Some(channel_manager),
157            ..self
158        }
159    }
160
161    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162        Self {
163            ddl_channel_manager: Some(channel_manager),
164            ..self
165        }
166    }
167
168    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169        Self {
170            heartbeat_channel_manager: Some(channel_manager),
171            ..self
172        }
173    }
174
175    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176        Self {
177            region_follower: Some(region_follower),
178            ..self
179        }
180    }
181
182    pub fn build(self) -> MetaClient {
183        let mut client = if let Some(mgr) = self.channel_manager {
184            MetaClient::with_channel_manager(self.id, mgr)
185        } else {
186            MetaClient::new(self.id)
187        };
188
189        let mgr = client.channel_manager.clone();
190
191        if self.enable_heartbeat {
192            if self.heartbeat_channel_manager.is_some() {
193                info!("Enable heartbeat channel using the heartbeat channel manager.");
194            }
195            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
196            client.heartbeat = Some(HeartbeatClient::new(
197                self.id,
198                self.role,
199                mgr,
200                DEFAULT_ASK_LEADER_MAX_RETRY,
201            ));
202        }
203
204        if self.enable_store {
205            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
206        }
207
208        if self.enable_procedure {
209            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
210            client.procedure = Some(ProcedureClient::new(
211                self.id,
212                self.role,
213                mgr,
214                DEFAULT_SUBMIT_DDL_MAX_RETRY,
215            ));
216        }
217
218        if self.enable_access_cluster_info {
219            client.cluster = Some(ClusterClient::new(
220                self.id,
221                self.role,
222                mgr,
223                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
224            ))
225        }
226
227        if let Some(region_follower) = self.region_follower {
228            client.region_follower = Some(region_follower);
229        }
230
231        client
232    }
233}
234
235#[derive(Debug, Default)]
236pub struct MetaClient {
237    id: Id,
238    channel_manager: ChannelManager,
239    heartbeat: Option<HeartbeatClient>,
240    store: Option<StoreClient>,
241    procedure: Option<ProcedureClient>,
242    cluster: Option<ClusterClient>,
243    region_follower: Option<RegionFollowerClientRef>,
244}
245
246pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
247
248/// A trait for clients that can manage region followers.
249#[async_trait::async_trait]
250pub trait RegionFollowerClient: Sync + Send + Debug {
251    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
252
253    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
254
255    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
256
257    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
258
259    async fn start(&self, urls: &[&str]) -> Result<()>;
260
261    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl ProcedureExecutor for MetaClient {
266    async fn submit_ddl_task(
267        &self,
268        _ctx: &ExecutorContext,
269        request: SubmitDdlTaskRequest,
270    ) -> MetaResult<SubmitDdlTaskResponse> {
271        self.submit_ddl_task(request)
272            .await
273            .map_err(BoxedError::new)
274            .context(meta_error::ExternalSnafu)
275    }
276
277    async fn migrate_region(
278        &self,
279        _ctx: &ExecutorContext,
280        request: MigrateRegionRequest,
281    ) -> MetaResult<MigrateRegionResponse> {
282        self.migrate_region(request)
283            .await
284            .map_err(BoxedError::new)
285            .context(meta_error::ExternalSnafu)
286    }
287
288    async fn reconcile(
289        &self,
290        _ctx: &ExecutorContext,
291        request: ReconcileRequest,
292    ) -> MetaResult<ReconcileResponse> {
293        self.reconcile(request)
294            .await
295            .map_err(BoxedError::new)
296            .context(meta_error::ExternalSnafu)
297    }
298
299    async fn manage_region_follower(
300        &self,
301        _ctx: &ExecutorContext,
302        request: ManageRegionFollowerRequest,
303    ) -> MetaResult<()> {
304        if let Some(region_follower) = &self.region_follower {
305            match request {
306                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
307                    region_follower
308                        .add_region_follower(add_region_follower_request)
309                        .await
310                }
311                ManageRegionFollowerRequest::RemoveRegionFollower(
312                    remove_region_follower_request,
313                ) => {
314                    region_follower
315                        .remove_region_follower(remove_region_follower_request)
316                        .await
317                }
318                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
319                    region_follower
320                        .add_table_follower(add_table_follower_request)
321                        .await
322                }
323                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
324                    region_follower
325                        .remove_table_follower(remove_table_follower_request)
326                        .await
327                }
328            }
329            .map_err(BoxedError::new)
330            .context(meta_error::ExternalSnafu)
331        } else {
332            UnsupportedSnafu {
333                operation: "manage_region_follower",
334            }
335            .fail()
336        }
337    }
338
339    async fn query_procedure_state(
340        &self,
341        _ctx: &ExecutorContext,
342        pid: &str,
343    ) -> MetaResult<ProcedureStateResponse> {
344        self.query_procedure_state(pid)
345            .await
346            .map_err(BoxedError::new)
347            .context(meta_error::ExternalSnafu)
348    }
349
350    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
351        self.procedure_client()
352            .map_err(BoxedError::new)
353            .context(meta_error::ExternalSnafu)?
354            .list_procedures()
355            .await
356            .map_err(BoxedError::new)
357            .context(meta_error::ExternalSnafu)
358    }
359}
360
361// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
362#[allow(deprecated)]
363#[async_trait::async_trait]
364impl ClusterInfo for MetaClient {
365    type Error = Error;
366
367    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
368        let cluster_client = self.cluster_client()?;
369
370        let (get_metasrv_nodes, nodes_key_prefix) = match role {
371            None => (true, Some(NodeInfoKey::key_prefix())),
372            Some(ClusterRole::Metasrv) => (true, None),
373            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
374        };
375
376        let mut nodes = if get_metasrv_nodes {
377            let last_activity_ts = -1; // Metasrv does not provide this information.
378
379            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
380                cluster_client.get_metasrv_peers().await?;
381            followers
382                .into_iter()
383                .map(|node| {
384                    if let Some(node_info) = node.info {
385                        NodeInfo {
386                            peer: node.peer.unwrap_or_default(),
387                            last_activity_ts,
388                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
389                            version: node_info.version,
390                            git_commit: node_info.git_commit,
391                            start_time_ms: node_info.start_time_ms,
392                            total_cpu_millicores: node_info.total_cpu_millicores,
393                            total_memory_bytes: node_info.total_memory_bytes,
394                            cpu_usage_millicores: node_info.cpu_usage_millicores,
395                            memory_usage_bytes: node_info.memory_usage_bytes,
396                            hostname: node_info.hostname,
397                        }
398                    } else {
399                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
400                        NodeInfo {
401                            peer: node.peer.unwrap_or_default(),
402                            last_activity_ts,
403                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
404                            version: node.version,
405                            git_commit: node.git_commit,
406                            start_time_ms: node.start_time_ms,
407                            total_cpu_millicores: node.cpus as i64,
408                            total_memory_bytes: node.memory_bytes as i64,
409                            cpu_usage_millicores: 0,
410                            memory_usage_bytes: 0,
411                            hostname: "".to_string(),
412                        }
413                    }
414                })
415                .chain(leader.into_iter().map(|node| {
416                    if let Some(node_info) = node.info {
417                        NodeInfo {
418                            peer: node.peer.unwrap_or_default(),
419                            last_activity_ts,
420                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
421                            version: node_info.version,
422                            git_commit: node_info.git_commit,
423                            start_time_ms: node_info.start_time_ms,
424                            total_cpu_millicores: node_info.total_cpu_millicores,
425                            total_memory_bytes: node_info.total_memory_bytes,
426                            cpu_usage_millicores: node_info.cpu_usage_millicores,
427                            memory_usage_bytes: node_info.memory_usage_bytes,
428                            hostname: node_info.hostname,
429                        }
430                    } else {
431                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
432                        NodeInfo {
433                            peer: node.peer.unwrap_or_default(),
434                            last_activity_ts,
435                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
436                            version: node.version,
437                            git_commit: node.git_commit,
438                            start_time_ms: node.start_time_ms,
439                            total_cpu_millicores: node.cpus as i64,
440                            total_memory_bytes: node.memory_bytes as i64,
441                            cpu_usage_millicores: 0,
442                            memory_usage_bytes: 0,
443                            hostname: "".to_string(),
444                        }
445                    }
446                }))
447                .collect::<Vec<_>>()
448        } else {
449            Vec::new()
450        };
451
452        if let Some(prefix) = nodes_key_prefix {
453            let req = RangeRequest::new().with_prefix(prefix);
454            let res = cluster_client.range(req).await?;
455            for kv in res.kvs {
456                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
457            }
458        }
459
460        Ok(nodes)
461    }
462
463    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
464        let cluster_kv_backend = Arc::new(self.cluster_client()?);
465        let range_prefix = DatanodeStatKey::prefix_key();
466        let req = RangeRequest::new().with_prefix(range_prefix);
467        let stream =
468            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
469        let mut datanode_stats = stream
470            .try_collect::<Vec<_>>()
471            .await
472            .context(ConvertMetaResponseSnafu)?;
473        let region_stats = datanode_stats
474            .iter_mut()
475            .flat_map(|datanode_stat| {
476                let last = datanode_stat.stats.pop();
477                last.map(|stat| stat.region_stats).unwrap_or_default()
478            })
479            .collect::<Vec<_>>();
480
481        Ok(region_stats)
482    }
483
484    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
485        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
486        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
487        let flow_state_manager = FlowStateManager::new(cluster_backend);
488        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
489
490        Ok(res.map(|r| r.into()))
491    }
492}
493
494fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
495    DatanodeStatValue::try_from(kv.value)
496        .map_err(BoxedError::new)
497        .context(ExternalSnafu)
498}
499
500impl MetaClient {
501    pub fn new(id: Id) -> Self {
502        Self {
503            id,
504            ..Default::default()
505        }
506    }
507
508    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
509        Self {
510            id,
511            channel_manager,
512            ..Default::default()
513        }
514    }
515
516    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
517    where
518        U: AsRef<str>,
519        A: AsRef<[U]> + Clone,
520    {
521        info!("MetaClient channel config: {:?}", self.channel_config());
522
523        if let Some(client) = &mut self.region_follower {
524            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
525            client.start(&urls).await?;
526            info!("Region follower client started");
527        }
528        if let Some(client) = &mut self.heartbeat {
529            client.start(urls.clone()).await?;
530            info!("Heartbeat client started");
531        }
532        if let Some(client) = &mut self.store {
533            client.start(urls.clone()).await?;
534            info!("Store client started");
535        }
536        if let Some(client) = &mut self.procedure {
537            client.start(urls.clone()).await?;
538            info!("DDL client started");
539        }
540        if let Some(client) = &mut self.cluster {
541            client.start(urls).await?;
542            info!("Cluster client started");
543        }
544
545        Ok(())
546    }
547
548    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
549    pub(crate) async fn start_with<U, A>(
550        &mut self,
551        leader_provider: LeaderProviderRef,
552        peers: A,
553    ) -> Result<()>
554    where
555        U: AsRef<str>,
556        A: AsRef<[U]> + Clone,
557    {
558        if let Some(client) = &self.region_follower {
559            info!("Starting region follower client ...");
560            client.start_with(leader_provider.clone()).await?;
561        }
562
563        if let Some(client) = &self.heartbeat {
564            info!("Starting heartbeat client ...");
565            client.start_with(leader_provider.clone()).await?;
566        }
567
568        if let Some(client) = &mut self.store {
569            info!("Starting store client ...");
570            client.start(peers.clone()).await?;
571        }
572
573        if let Some(client) = &self.procedure {
574            info!("Starting procedure client ...");
575            client.start_with(leader_provider.clone()).await?;
576        }
577
578        if let Some(client) = &mut self.cluster {
579            info!("Starting cluster client ...");
580            client.start_with(leader_provider).await?;
581        }
582        Ok(())
583    }
584
585    /// Ask the leader address of `metasrv`, and the heartbeat component
586    /// needs to create a bidirectional streaming to the leader.
587    pub async fn ask_leader(&self) -> Result<String> {
588        self.heartbeat_client()?.ask_leader().await
589    }
590
591    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
592    /// other end is the leader of `metasrv`.
593    ///
594    /// The `datanode` needs to use the sender to continuously send heartbeat
595    /// packets (some self-state data), and the receiver can receive a response
596    /// from "metasrv" (which may contain some scheduling instructions).
597    ///
598    /// Returns the heartbeat sender, stream, and configuration received from Metasrv.
599    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
600        self.heartbeat_client()?.heartbeat().await
601    }
602
603    /// Range gets the keys in the range from the key-value store.
604    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
605        self.store_client()?
606            .range(req.into())
607            .await?
608            .try_into()
609            .context(ConvertMetaResponseSnafu)
610    }
611
612    /// Put puts the given key into the key-value store.
613    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
614        self.store_client()?
615            .put(req.into())
616            .await?
617            .try_into()
618            .context(ConvertMetaResponseSnafu)
619    }
620
621    /// BatchGet atomically get values by the given keys from the key-value store.
622    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
623        self.store_client()?
624            .batch_get(req.into())
625            .await?
626            .try_into()
627            .context(ConvertMetaResponseSnafu)
628    }
629
630    /// BatchPut atomically puts the given keys into the key-value store.
631    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
632        self.store_client()?
633            .batch_put(req.into())
634            .await?
635            .try_into()
636            .context(ConvertMetaResponseSnafu)
637    }
638
639    /// BatchDelete atomically deletes the given keys from the key-value store.
640    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
641        self.store_client()?
642            .batch_delete(req.into())
643            .await?
644            .try_into()
645            .context(ConvertMetaResponseSnafu)
646    }
647
648    /// CompareAndPut atomically puts the value to the given updated
649    /// value if the current value == the expected value.
650    pub async fn compare_and_put(
651        &self,
652        req: CompareAndPutRequest,
653    ) -> Result<CompareAndPutResponse> {
654        self.store_client()?
655            .compare_and_put(req.into())
656            .await?
657            .try_into()
658            .context(ConvertMetaResponseSnafu)
659    }
660
661    /// DeleteRange deletes the given range from the key-value store.
662    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
663        self.store_client()?
664            .delete_range(req.into())
665            .await?
666            .try_into()
667            .context(ConvertMetaResponseSnafu)
668    }
669
670    /// Query the procedure state by its id.
671    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
672        self.procedure_client()?.query_procedure_state(pid).await
673    }
674
675    /// Submit a region migration task.
676    pub async fn migrate_region(
677        &self,
678        request: MigrateRegionRequest,
679    ) -> Result<MigrateRegionResponse> {
680        self.procedure_client()?
681            .migrate_region(
682                request.region_id,
683                request.from_peer,
684                request.to_peer,
685                request.timeout,
686            )
687            .await
688    }
689
690    /// Reconcile the procedure state.
691    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
692        self.procedure_client()?.reconcile(request).await
693    }
694
695    /// Submit a DDL task
696    pub async fn submit_ddl_task(
697        &self,
698        req: SubmitDdlTaskRequest,
699    ) -> Result<SubmitDdlTaskResponse> {
700        let res = self
701            .procedure_client()?
702            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
703            .await?
704            .try_into()
705            .context(ConvertMetaResponseSnafu)?;
706
707        Ok(res)
708    }
709
710    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
711        self.heartbeat.clone().context(NotStartedSnafu {
712            name: "heartbeat_client",
713        })
714    }
715
716    pub fn store_client(&self) -> Result<StoreClient> {
717        self.store.clone().context(NotStartedSnafu {
718            name: "store_client",
719        })
720    }
721
722    pub fn procedure_client(&self) -> Result<ProcedureClient> {
723        self.procedure.clone().context(NotStartedSnafu {
724            name: "procedure_client",
725        })
726    }
727
728    pub fn cluster_client(&self) -> Result<ClusterClient> {
729        self.cluster.clone().context(NotStartedSnafu {
730            name: "cluster_client",
731        })
732    }
733
734    pub fn channel_config(&self) -> &ChannelConfig {
735        self.channel_manager.config()
736    }
737
738    pub fn id(&self) -> Id {
739        self.id
740    }
741}
742
743#[cfg(test)]
744mod tests {
745    use std::sync::atomic::{AtomicUsize, Ordering};
746
747    use api::v1::meta::{HeartbeatRequest, Peer};
748    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
749    use rand::Rng;
750
751    use super::*;
752    use crate::error;
753    use crate::mocks::{self, MockMetaContext};
754
755    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
756
757    struct TestClient {
758        ns: String,
759        client: MetaClient,
760        meta_ctx: MockMetaContext,
761    }
762
763    impl TestClient {
764        async fn new(ns: impl Into<String>) -> Self {
765            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
766            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
767            Self {
768                ns: ns.into(),
769                client,
770                meta_ctx,
771            }
772        }
773
774        fn key(&self, name: &str) -> Vec<u8> {
775            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
776        }
777
778        async fn gen_data(&self) {
779            for i in 0..10 {
780                let req = PutRequest::new()
781                    .with_key(self.key(&format!("key-{i}")))
782                    .with_value(format!("{}-{}", "value", i).into_bytes())
783                    .with_prev_kv();
784                let res = self.client.put(req).await;
785                let _ = res.unwrap();
786            }
787        }
788
789        async fn clear_data(&self) {
790            let req =
791                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
792            let res = self.client.delete_range(req).await;
793            let _ = res.unwrap();
794        }
795
796        #[allow(dead_code)]
797        fn kv_backend(&self) -> KvBackendRef {
798            self.meta_ctx.kv_backend.clone()
799        }
800
801        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
802            self.meta_ctx.in_memory.clone()
803        }
804    }
805
806    async fn new_client(ns: impl Into<String>) -> TestClient {
807        let client = TestClient::new(ns).await;
808        client.clear_data().await;
809        client
810    }
811
812    #[tokio::test]
813    async fn test_meta_client_builder() {
814        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
815
816        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
817            .enable_heartbeat()
818            .build();
819        let _ = meta_client.heartbeat_client().unwrap();
820        assert!(meta_client.store_client().is_err());
821        meta_client.start(urls).await.unwrap();
822
823        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
824        assert!(meta_client.heartbeat_client().is_err());
825        assert!(meta_client.store_client().is_err());
826        meta_client.start(urls).await.unwrap();
827
828        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
829            .enable_store()
830            .build();
831        assert!(meta_client.heartbeat_client().is_err());
832        let _ = meta_client.store_client().unwrap();
833        meta_client.start(urls).await.unwrap();
834
835        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
836            .enable_heartbeat()
837            .enable_store()
838            .build();
839        assert_eq!(2, meta_client.id());
840        assert_eq!(2, meta_client.id());
841        let _ = meta_client.heartbeat_client().unwrap();
842        let _ = meta_client.store_client().unwrap();
843        meta_client.start(urls).await.unwrap();
844    }
845
846    #[tokio::test]
847    async fn test_not_start_heartbeat_client() {
848        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
849        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
850            .enable_store()
851            .build();
852        meta_client.start(urls).await.unwrap();
853        let res = meta_client.ask_leader().await;
854        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
855    }
856
857    #[tokio::test]
858    async fn test_not_start_store_client() {
859        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
860        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
861            .enable_heartbeat()
862            .build();
863
864        meta_client.start(urls).await.unwrap();
865        let res = meta_client.put(PutRequest::default()).await;
866        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
867    }
868
869    #[tokio::test]
870    async fn test_ask_leader() {
871        let tc = new_client("test_ask_leader").await;
872        tc.client.ask_leader().await.unwrap();
873    }
874
875    #[tokio::test]
876    async fn test_heartbeat() {
877        let tc = new_client("test_heartbeat").await;
878        let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
879        // send heartbeats
880
881        let request_sent = Arc::new(AtomicUsize::new(0));
882        let request_sent_clone = request_sent.clone();
883        let _handle = tokio::spawn(async move {
884            for _ in 0..5 {
885                let req = HeartbeatRequest {
886                    peer: Some(Peer {
887                        id: 1,
888                        addr: "meta_client_peer".to_string(),
889                    }),
890                    ..Default::default()
891                };
892                sender.send(req).await.unwrap();
893                request_sent_clone.fetch_add(1, Ordering::Relaxed);
894            }
895        });
896
897        let heartbeat_count = Arc::new(AtomicUsize::new(0));
898        let heartbeat_count_clone = heartbeat_count.clone();
899        let handle = tokio::spawn(async move {
900            while let Some(_resp) = receiver.message().await.unwrap() {
901                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
902            }
903        });
904
905        handle.await.unwrap();
906        //+1 for the initial response
907        assert_eq!(
908            request_sent.load(Ordering::Relaxed) + 1,
909            heartbeat_count.load(Ordering::Relaxed)
910        );
911    }
912
913    #[tokio::test]
914    async fn test_range_get() {
915        let tc = new_client("test_range_get").await;
916        tc.gen_data().await;
917
918        let key = tc.key("key-0");
919        let req = RangeRequest::new().with_key(key.as_slice());
920        let res = tc.client.range(req).await;
921        let mut kvs = res.unwrap().take_kvs();
922        assert_eq!(1, kvs.len());
923        let mut kv = kvs.pop().unwrap();
924        assert_eq!(key, kv.take_key());
925        assert_eq!(b"value-0".to_vec(), kv.take_value());
926    }
927
928    #[tokio::test]
929    async fn test_range_get_prefix() {
930        let tc = new_client("test_range_get_prefix").await;
931        tc.gen_data().await;
932
933        let req = RangeRequest::new().with_prefix(tc.key("key-"));
934        let res = tc.client.range(req).await;
935        let kvs = res.unwrap().take_kvs();
936        assert_eq!(10, kvs.len());
937        for (i, mut kv) in kvs.into_iter().enumerate() {
938            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
939            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
940        }
941    }
942
943    #[tokio::test]
944    async fn test_range() {
945        let tc = new_client("test_range").await;
946        tc.gen_data().await;
947
948        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
949        let res = tc.client.range(req).await;
950        let kvs = res.unwrap().take_kvs();
951        assert_eq!(3, kvs.len());
952        for (i, mut kv) in kvs.into_iter().enumerate() {
953            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
954            assert_eq!(
955                format!("{}-{}", "value", i + 5).into_bytes(),
956                kv.take_value()
957            );
958        }
959    }
960
961    #[tokio::test]
962    async fn test_range_keys_only() {
963        let tc = new_client("test_range_keys_only").await;
964        tc.gen_data().await;
965
966        let req = RangeRequest::new()
967            .with_range(tc.key("key-5"), tc.key("key-8"))
968            .with_keys_only();
969        let res = tc.client.range(req).await;
970        let kvs = res.unwrap().take_kvs();
971        assert_eq!(3, kvs.len());
972        for (i, mut kv) in kvs.into_iter().enumerate() {
973            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
974            assert!(kv.take_value().is_empty());
975        }
976    }
977
978    #[tokio::test]
979    async fn test_put() {
980        let tc = new_client("test_put").await;
981
982        let req = PutRequest::new()
983            .with_key(tc.key("key"))
984            .with_value(b"value".to_vec());
985        let res = tc.client.put(req).await;
986        assert!(res.unwrap().prev_kv.is_none());
987    }
988
989    #[tokio::test]
990    async fn test_put_with_prev_kv() {
991        let tc = new_client("test_put_with_prev_kv").await;
992
993        let key = tc.key("key");
994        let req = PutRequest::new()
995            .with_key(key.as_slice())
996            .with_value(b"value".to_vec())
997            .with_prev_kv();
998        let res = tc.client.put(req).await;
999        assert!(res.unwrap().prev_kv.is_none());
1000
1001        let req = PutRequest::new()
1002            .with_key(key.as_slice())
1003            .with_value(b"value1".to_vec())
1004            .with_prev_kv();
1005        let res = tc.client.put(req).await;
1006        let mut kv = res.unwrap().prev_kv.unwrap();
1007        assert_eq!(key, kv.take_key());
1008        assert_eq!(b"value".to_vec(), kv.take_value());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_batch_put() {
1013        let tc = new_client("test_batch_put").await;
1014
1015        let mut req = BatchPutRequest::new();
1016        for i in 0..275 {
1017            req = req.add_kv(
1018                tc.key(&format!("key-{}", i)),
1019                format!("value-{}", i).into_bytes(),
1020            );
1021        }
1022
1023        let res = tc.client.batch_put(req).await;
1024        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1025
1026        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1027        let res = tc.client.range(req).await;
1028        let kvs = res.unwrap().take_kvs();
1029        assert_eq!(275, kvs.len());
1030    }
1031
1032    #[tokio::test]
1033    async fn test_batch_get() {
1034        let tc = new_client("test_batch_get").await;
1035        tc.gen_data().await;
1036
1037        let mut req = BatchGetRequest::default();
1038        for i in 0..256 {
1039            req = req.add_key(tc.key(&format!("key-{}", i)));
1040        }
1041        let res = tc.client.batch_get(req).await.unwrap();
1042        assert_eq!(10, res.kvs.len());
1043
1044        let req = BatchGetRequest::default()
1045            .add_key(tc.key("key-1"))
1046            .add_key(tc.key("key-999"));
1047        let res = tc.client.batch_get(req).await.unwrap();
1048        assert_eq!(1, res.kvs.len());
1049    }
1050
1051    #[tokio::test]
1052    async fn test_batch_put_with_prev_kv() {
1053        let tc = new_client("test_batch_put_with_prev_kv").await;
1054
1055        let key = tc.key("key");
1056        let key2 = tc.key("key2");
1057        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1058        let res = tc.client.batch_put(req).await;
1059        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1060
1061        let req = BatchPutRequest::new()
1062            .add_kv(key.as_slice(), b"value-".to_vec())
1063            .add_kv(key2.as_slice(), b"value2-".to_vec())
1064            .with_prev_kv();
1065        let res = tc.client.batch_put(req).await;
1066        let mut kvs = res.unwrap().take_prev_kvs();
1067        assert_eq!(1, kvs.len());
1068        let mut kv = kvs.pop().unwrap();
1069        assert_eq!(key, kv.take_key());
1070        assert_eq!(b"value".to_vec(), kv.take_value());
1071    }
1072
1073    #[tokio::test]
1074    async fn test_compare_and_put() {
1075        let tc = new_client("test_compare_and_put").await;
1076
1077        let key = tc.key("key");
1078        let req = CompareAndPutRequest::new()
1079            .with_key(key.as_slice())
1080            .with_expect(b"expect".to_vec())
1081            .with_value(b"value".to_vec());
1082        let res = tc.client.compare_and_put(req).await;
1083        assert!(!res.unwrap().is_success());
1084
1085        // create if absent
1086        let req = CompareAndPutRequest::new()
1087            .with_key(key.as_slice())
1088            .with_value(b"value".to_vec());
1089        let res = tc.client.compare_and_put(req).await;
1090        let mut res = res.unwrap();
1091        assert!(res.is_success());
1092        assert!(res.take_prev_kv().is_none());
1093
1094        // compare and put fail
1095        let req = CompareAndPutRequest::new()
1096            .with_key(key.as_slice())
1097            .with_expect(b"not_eq".to_vec())
1098            .with_value(b"value2".to_vec());
1099        let res = tc.client.compare_and_put(req).await;
1100        let mut res = res.unwrap();
1101        assert!(!res.is_success());
1102        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1103
1104        // compare and put success
1105        let req = CompareAndPutRequest::new()
1106            .with_key(key.as_slice())
1107            .with_expect(b"value".to_vec())
1108            .with_value(b"value2".to_vec());
1109        let res = tc.client.compare_and_put(req).await;
1110        let mut res = res.unwrap();
1111        assert!(res.is_success());
1112
1113        // If compare-and-put is success, previous value doesn't need to be returned.
1114        assert!(res.take_prev_kv().is_none());
1115    }
1116
1117    #[tokio::test]
1118    async fn test_delete_with_key() {
1119        let tc = new_client("test_delete_with_key").await;
1120        tc.gen_data().await;
1121
1122        let req = DeleteRangeRequest::new()
1123            .with_key(tc.key("key-0"))
1124            .with_prev_kv();
1125        let res = tc.client.delete_range(req).await;
1126        let mut res = res.unwrap();
1127        assert_eq!(1, res.deleted());
1128        let mut kvs = res.take_prev_kvs();
1129        assert_eq!(1, kvs.len());
1130        let mut kv = kvs.pop().unwrap();
1131        assert_eq!(b"value-0".to_vec(), kv.take_value());
1132    }
1133
1134    #[tokio::test]
1135    async fn test_delete_with_prefix() {
1136        let tc = new_client("test_delete_with_prefix").await;
1137        tc.gen_data().await;
1138
1139        let req = DeleteRangeRequest::new()
1140            .with_prefix(tc.key("key-"))
1141            .with_prev_kv();
1142        let res = tc.client.delete_range(req).await;
1143        let mut res = res.unwrap();
1144        assert_eq!(10, res.deleted());
1145        let kvs = res.take_prev_kvs();
1146        assert_eq!(10, kvs.len());
1147        for (i, mut kv) in kvs.into_iter().enumerate() {
1148            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn test_delete_with_range() {
1154        let tc = new_client("test_delete_with_range").await;
1155        tc.gen_data().await;
1156
1157        let req = DeleteRangeRequest::new()
1158            .with_range(tc.key("key-2"), tc.key("key-7"))
1159            .with_prev_kv();
1160        let res = tc.client.delete_range(req).await;
1161        let mut res = res.unwrap();
1162        assert_eq!(5, res.deleted());
1163        let kvs = res.take_prev_kvs();
1164        assert_eq!(5, kvs.len());
1165        for (i, mut kv) in kvs.into_iter().enumerate() {
1166            assert_eq!(
1167                format!("{}-{}", "value", i + 2).into_bytes(),
1168                kv.take_value()
1169            );
1170        }
1171    }
1172
1173    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1174        Ok(())
1175    }
1176
1177    #[tokio::test]
1178    async fn test_cluster_client_adaptive_range() {
1179        let tx = new_client("test_cluster_client").await;
1180        let in_memory = tx.in_memory().unwrap();
1181        let cluster_client = tx.client.cluster_client().unwrap();
1182        let mut rng = rand::rng();
1183
1184        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1185        for i in 0..10 {
1186            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1187            in_memory
1188                .put(
1189                    PutRequest::new()
1190                        .with_key(format!("__prefix/{i}").as_bytes())
1191                        .with_value(data.clone()),
1192                )
1193                .await
1194                .unwrap();
1195        }
1196
1197        let req = RangeRequest::new().with_prefix(b"__prefix/");
1198        let stream =
1199            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1200
1201        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1202        assert_eq!(10, res.len());
1203    }
1204}