From 03bb6e4f285fb6b2d2dc7b62b82b83238f0e17e4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 12 Jun 2025 00:33:36 +0800 Subject: [PATCH] feat(cli): add metadata get commands (#6299) * refactor(cli): restructure cli modules and commands Signed-off-by: WenyXu * feat(cli): add metadata get commands Signed-off-by: WenyXu * feat(cli): enhance table metadata query capabilities Signed-off-by: WenyXu * refactor: minor refactor Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/cli/src/data.rs | 39 +++ src/cli/src/{ => data}/export.rs | 0 src/cli/src/{ => data}/import.rs | 0 src/cli/src/error.rs | 20 +- src/cli/src/lib.rs | 28 +- src/cli/src/metadata.rs | 42 +++ src/cli/src/metadata/common.rs | 116 +++++++++ src/cli/src/metadata/control.rs | 38 +++ src/cli/src/metadata/control/get.rs | 242 ++++++++++++++++++ src/cli/src/metadata/control/utils.rs | 36 +++ .../snapshot.rs} | 173 +++---------- src/plugins/src/cli.rs | 6 +- 12 files changed, 574 insertions(+), 166 deletions(-) create mode 100644 src/cli/src/data.rs rename src/cli/src/{ => data}/export.rs (100%) rename src/cli/src/{ => data}/import.rs (100%) create mode 100644 src/cli/src/metadata.rs create mode 100644 src/cli/src/metadata/common.rs create mode 100644 src/cli/src/metadata/control.rs create mode 100644 src/cli/src/metadata/control/get.rs create mode 100644 src/cli/src/metadata/control/utils.rs rename src/cli/src/{meta_snapshot.rs => metadata/snapshot.rs} (66%) diff --git a/src/cli/src/data.rs b/src/cli/src/data.rs new file mode 100644 index 0000000000..a74823b756 --- /dev/null +++ b/src/cli/src/data.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod export; +mod import; + +use clap::Subcommand; +use common_error::ext::BoxedError; + +use crate::data::export::ExportCommand; +use crate::data::import::ImportCommand; +use crate::Tool; + +/// Command for data operations including exporting data from and importing data into GreptimeDB. +#[derive(Subcommand)] +pub enum DataCommand { + Export(ExportCommand), + Import(ImportCommand), +} + +impl DataCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + match self { + DataCommand::Export(cmd) => cmd.build().await, + DataCommand::Import(cmd) => cmd.build().await, + } + } +} diff --git a/src/cli/src/export.rs b/src/cli/src/data/export.rs similarity index 100% rename from src/cli/src/export.rs rename to src/cli/src/data/export.rs diff --git a/src/cli/src/import.rs b/src/cli/src/data/import.rs similarity index 100% rename from src/cli/src/import.rs rename to src/cli/src/data/import.rs diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index c6852b0e28..0f01973d0b 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -30,6 +30,7 @@ pub enum Error { location: Location, msg: String, }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { #[snafu(implicit)] @@ -228,22 +229,25 @@ pub enum Error { #[snafu(source)] error: ObjectStoreError, }, + #[snafu(display("S3 config need be set"))] S3ConfigNotSet { #[snafu(implicit)] location: Location, }, + #[snafu(display("Output directory not set"))] OutputDirNotSet { #[snafu(implicit)] location: Location, }, - #[snafu(display("KV backend not set: {}", backend))] - KvBackendNotSet { - backend: String, + + #[snafu(display("Empty store addresses"))] + EmptyStoreAddrs { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unsupported memory backend"))] UnsupportedMemoryBackend { #[snafu(implicit)] @@ -256,6 +260,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid arguments: {}", msg))] + InvalidArguments { + msg: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -276,6 +287,7 @@ impl ErrorExt for Error { | Error::EmptyResult { .. } | Error::InvalidFilePath { .. } | Error::UnsupportedMemoryBackend { .. } + | Error::InvalidArguments { .. } | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } @@ -296,7 +308,7 @@ impl ErrorExt for Error { Error::OpenDal { .. } => StatusCode::Internal, Error::S3ConfigNotSet { .. } | Error::OutputDirNotSet { .. } - | Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments, + | Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments, Error::BuildRuntime { source, .. } => source.status_code(), diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs index deb8fbecb5..d610738add 100644 --- a/src/cli/src/lib.rs +++ b/src/cli/src/lib.rs @@ -13,22 +13,20 @@ // limitations under the License. mod bench; +mod data; mod database; pub mod error; -mod export; -mod import; -mod meta_snapshot; +mod metadata; use async_trait::async_trait; -use clap::{Parser, Subcommand}; +use clap::Parser; use common_error::ext::BoxedError; pub use database::DatabaseClient; use error::Result; pub use crate::bench::BenchTableMetadataCommand; -pub use crate::export::ExportCommand; -pub use crate::import::ImportCommand; -pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand}; +pub use crate::data::DataCommand; +pub use crate::metadata::MetadataCommand; #[async_trait] pub trait Tool: Send + Sync { @@ -51,19 +49,3 @@ impl AttachCommand { unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") } } - -/// Subcommand for data operations like export and import. -#[derive(Subcommand)] -pub enum DataCommand { - Export(ExportCommand), - Import(ImportCommand), -} - -impl DataCommand { - pub async fn build(&self) -> std::result::Result, BoxedError> { - match self { - DataCommand::Export(cmd) => cmd.build().await, - DataCommand::Import(cmd) => cmd.build().await, - } - } -} diff --git a/src/cli/src/metadata.rs b/src/cli/src/metadata.rs new file mode 100644 index 0000000000..6a6a5ae3f6 --- /dev/null +++ b/src/cli/src/metadata.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; +mod control; +mod snapshot; + +use clap::Subcommand; +use common_error::ext::BoxedError; + +use crate::metadata::control::ControlCommand; +use crate::metadata::snapshot::SnapshotCommand; +use crate::Tool; + +/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots. +#[derive(Subcommand)] +pub enum MetadataCommand { + #[clap(subcommand)] + Snapshot(SnapshotCommand), + #[clap(subcommand)] + Control(ControlCommand), +} + +impl MetadataCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + MetadataCommand::Snapshot(cmd) => cmd.build().await, + MetadataCommand::Control(cmd) => cmd.build().await, + } + } +} diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/metadata/common.rs new file mode 100644 index 0000000000..7e77cd49fb --- /dev/null +++ b/src/cli/src/metadata/common.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use clap::Parser; +use common_error::ext::BoxedError; +use common_meta::kv_backend::chroot::ChrootKvBackend; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::KvBackendRef; +use meta_srv::bootstrap::create_etcd_client; +use meta_srv::metasrv::BackendImpl; + +use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu}; + +#[derive(Debug, Default, Parser)] +pub(crate) struct StoreConfig { + /// The endpoint of store. one of etcd, postgres or mysql. + /// + /// For postgres store, the format is: + /// "password=password dbname=postgres user=postgres host=localhost port=5432" + /// + /// For etcd store, the format is: + /// "127.0.0.1:2379" + /// + /// For mysql store, the format is: + /// "mysql://user:password@ip:port/dbname" + #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)] + store_addrs: Vec, + + /// The maximum number of operations in a transaction. Only used when using [etcd-store]. + #[clap(long, default_value = "128")] + max_txn_ops: usize, + + /// The metadata store backend. + #[clap(long, value_enum, default_value = "etcd-store")] + backend: BackendImpl, + + /// The key prefix of the metadata store. + #[clap(long, default_value = "")] + store_key_prefix: String, + + /// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store]. + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)] + meta_table_name: String, +} + +impl StoreConfig { + /// Builds a [`KvBackendRef`] from the store configuration. + pub async fn build(&self) -> Result { + let max_txn_ops = self.max_txn_ops; + let store_addrs = &self.store_addrs; + if store_addrs.is_empty() { + EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new) + } else { + let kvbackend = match self.backend { + BackendImpl::EtcdStore => { + let etcd_client = create_etcd_client(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) + } + #[cfg(feature = "pg_kvbackend")] + BackendImpl::PostgresStore => { + let table_name = &self.meta_table_name; + let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool( + pool, + table_name, + max_txn_ops, + ) + .await + .map_err(BoxedError::new)?) + } + #[cfg(feature = "mysql_kvbackend")] + BackendImpl::MysqlStore => { + let table_name = &self.meta_table_name; + let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool( + pool, + table_name, + max_txn_ops, + ) + .await + .map_err(BoxedError::new)?) + } + BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu + .fail() + .map_err(BoxedError::new), + }; + if self.store_key_prefix.is_empty() { + kvbackend + } else { + let chroot_kvbackend = + ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?); + Ok(Arc::new(chroot_kvbackend)) + } + } + } +} diff --git a/src/cli/src/metadata/control.rs b/src/cli/src/metadata/control.rs new file mode 100644 index 0000000000..573bffda2c --- /dev/null +++ b/src/cli/src/metadata/control.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod get; +mod utils; + +use clap::Subcommand; +use common_error::ext::BoxedError; +use get::GetCommand; + +use crate::Tool; + +/// Subcommand for metadata control. +#[derive(Subcommand)] +pub enum ControlCommand { + /// Get the metadata from the metasrv. + #[clap(subcommand)] + Get(GetCommand), +} + +impl ControlCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + ControlCommand::Get(cmd) => cmd.build().await, + } + } +} diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs new file mode 100644 index 0000000000..89b3bdc1c8 --- /dev/null +++ b/src/cli/src/metadata/control/get.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::min; + +use async_trait::async_trait; +use clap::{Parser, Subcommand}; +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::format_full_table_name; +use common_error::ext::BoxedError; +use common_meta::key::table_info::TableInfoKey; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::table_route::TableRouteKey; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use common_meta::rpc::store::RangeRequest; +use futures::TryStreamExt; + +use crate::error::InvalidArgumentsSnafu; +use crate::metadata::common::StoreConfig; +use crate::metadata::control::utils::{decode_key_value, json_fromatter}; +use crate::Tool; + +/// Subcommand for get command. +#[derive(Subcommand)] +pub enum GetCommand { + Key(GetKeyCommand), + Table(GetTableCommand), +} + +impl GetCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + GetCommand::Key(cmd) => cmd.build().await, + GetCommand::Table(cmd) => cmd.build().await, + } + } +} + +/// Get key-value pairs from the metadata store. +#[derive(Debug, Default, Parser)] +pub struct GetKeyCommand { + /// The key to get from the metadata store. If empty, returns all key-value pairs. + #[clap(default_value = "")] + key: String, + + /// Whether to perform a prefix query. If true, returns all key-value pairs where the key starts with the given prefix. + #[clap(long, default_value = "false")] + prefix: bool, + + /// The maximum number of key-value pairs to return. If 0, returns all key-value pairs. + #[clap(long, default_value = "0")] + limit: u64, + + #[clap(flatten)] + store: StoreConfig, +} + +impl GetKeyCommand { + pub async fn build(&self) -> Result, BoxedError> { + let kvbackend = self.store.build().await?; + Ok(Box::new(GetKeyTool { + kvbackend, + key: self.key.clone(), + prefix: self.prefix, + limit: self.limit, + })) + } +} + +struct GetKeyTool { + kvbackend: KvBackendRef, + key: String, + prefix: bool, + limit: u64, +} + +#[async_trait] +impl Tool for GetKeyTool { + async fn do_work(&self) -> Result<(), BoxedError> { + let mut req = RangeRequest::default(); + if self.prefix { + req = req.with_prefix(self.key.as_bytes()); + } else { + req = req.with_key(self.key.as_bytes()); + } + let page_size = if self.limit > 0 { + min(self.limit as usize, DEFAULT_PAGE_SIZE) + } else { + DEFAULT_PAGE_SIZE + }; + let pagination_stream = + PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value); + let mut stream = Box::pin(pagination_stream.into_stream()); + let mut counter = 0; + + while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? { + print!("{}\n{}\n", key, value); + counter += 1; + if self.limit > 0 && counter >= self.limit { + break; + } + } + + Ok(()) + } +} + +/// Get table metadata from the metadata store via table id. +#[derive(Debug, Default, Parser)] +pub struct GetTableCommand { + /// Get table metadata by table id. + #[clap(long)] + table_id: Option, + + /// Get table metadata by table name. + #[clap(long)] + table_name: Option, + + /// The schema name of the table. + #[clap(long)] + schema_name: Option, + + /// Pretty print the output. + #[clap(long, default_value = "false")] + pretty: bool, + + #[clap(flatten)] + store: StoreConfig, +} + +impl GetTableCommand { + pub fn validate(&self) -> Result<(), BoxedError> { + if self.table_id.is_none() && self.table_name.is_none() { + return Err(BoxedError::new( + InvalidArgumentsSnafu { + msg: "You must specify either --table-id or --table-name.", + } + .build(), + )); + } + Ok(()) + } +} + +struct GetTableTool { + kvbackend: KvBackendRef, + table_id: Option, + table_name: Option, + schema_name: Option, + pretty: bool, +} + +#[async_trait] +impl Tool for GetTableTool { + async fn do_work(&self) -> Result<(), BoxedError> { + let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone()); + let table_name_manager = table_metadata_manager.table_name_manager(); + let table_info_manager = table_metadata_manager.table_info_manager(); + let table_route_manager = table_metadata_manager.table_route_manager(); + + let table_id = if let Some(table_name) = &self.table_name { + let catalog = DEFAULT_CATALOG_NAME.to_string(); + let schema_name = self + .schema_name + .clone() + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let key = TableNameKey::new(&catalog, &schema_name, table_name); + + let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)? + else { + println!( + "Table({}) not found", + format_full_table_name(&catalog, &schema_name, table_name) + ); + return Ok(()); + }; + + table_name.table_id() + } else { + // Safety: we have validated that table_id or table_name is not None + self.table_id.unwrap() + }; + + let table_info = table_info_manager + .get(table_id) + .await + .map_err(BoxedError::new)?; + if let Some(table_info) = table_info { + println!( + "{}\n{}", + TableInfoKey::new(table_id), + json_fromatter(self.pretty, &*table_info) + ); + } else { + println!("Table info not found"); + } + + let table_route = table_route_manager + .table_route_storage() + .get(table_id) + .await + .map_err(BoxedError::new)?; + if let Some(table_route) = table_route { + println!( + "{}\n{}", + TableRouteKey::new(table_id), + json_fromatter(self.pretty, &table_route) + ); + } else { + println!("Table route not found"); + } + + Ok(()) + } +} + +impl GetTableCommand { + pub async fn build(&self) -> Result, BoxedError> { + self.validate()?; + let kvbackend = self.store.build().await?; + Ok(Box::new(GetTableTool { + kvbackend, + table_id: self.table_id, + table_name: self.table_name.clone(), + schema_name: self.schema_name.clone(), + pretty: self.pretty, + })) + } +} diff --git a/src/cli/src/metadata/control/utils.rs b/src/cli/src/metadata/control/utils.rs new file mode 100644 index 0000000000..64e6ae8e1a --- /dev/null +++ b/src/cli/src/metadata/control/utils.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::error::Result as CommonMetaResult; +use common_meta::rpc::KeyValue; +use serde::Serialize; + +/// Decodes a key-value pair into a string. +pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> { + let key = String::from_utf8_lossy(&kv.key).to_string(); + let value = String::from_utf8_lossy(&kv.value).to_string(); + Ok((key, value)) +} + +/// Formats a value as a JSON string. +pub fn json_fromatter(pretty: bool, value: &T) -> String +where + T: Serialize, +{ + if pretty { + serde_json::to_string_pretty(value).unwrap() + } else { + serde_json::to_string(value).unwrap() + } +} diff --git a/src/cli/src/meta_snapshot.rs b/src/cli/src/metadata/snapshot.rs similarity index 66% rename from src/cli/src/meta_snapshot.rs rename to src/cli/src/metadata/snapshot.rs index 6d0bd364c6..d164142d0f 100644 --- a/src/cli/src/meta_snapshot.rs +++ b/src/cli/src/metadata/snapshot.rs @@ -13,139 +13,37 @@ // limitations under the License. use std::path::Path; -use std::sync::Arc; use async_trait::async_trait; use clap::{Parser, Subcommand}; use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; -use common_meta::kv_backend::chroot::ChrootKvBackend; -use common_meta::kv_backend::etcd::EtcdStore; -use common_meta::kv_backend::KvBackendRef; use common_meta::snapshot::MetadataSnapshotManager; -use meta_srv::bootstrap::create_etcd_client; -use meta_srv::metasrv::BackendImpl; use object_store::services::{Fs, S3}; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; -use crate::error::{ - InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu, - UnsupportedMemoryBackendSnafu, -}; +use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu}; +use crate::metadata::common::StoreConfig; use crate::Tool; -/// Subcommand for metadata snapshot management. +/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information. #[derive(Subcommand)] -pub enum MetaCommand { - #[clap(subcommand)] - Snapshot(MetaSnapshotCommand), +pub enum SnapshotCommand { + /// Save a snapshot of the current metadata state to a specified location. + Save(SaveCommand), + /// Restore metadata from a snapshot. + Restore(RestoreCommand), + /// Explore metadata from a snapshot. + Info(InfoCommand), } -impl MetaCommand { +impl SnapshotCommand { pub async fn build(&self) -> Result, BoxedError> { match self { - MetaCommand::Snapshot(cmd) => cmd.build().await, - } - } -} - -/// Subcommand for metadata snapshot operations. such as save, restore and info. -#[derive(Subcommand)] -pub enum MetaSnapshotCommand { - /// Export metadata snapshot tool. - Save(MetaSaveCommand), - /// Restore metadata snapshot tool. - Restore(MetaRestoreCommand), - /// Explore metadata from metadata snapshot. - Info(MetaInfoCommand), -} - -impl MetaSnapshotCommand { - pub async fn build(&self) -> Result, BoxedError> { - match self { - MetaSnapshotCommand::Save(cmd) => cmd.build().await, - MetaSnapshotCommand::Restore(cmd) => cmd.build().await, - MetaSnapshotCommand::Info(cmd) => cmd.build().await, - } - } -} - -#[derive(Debug, Default, Parser)] -struct MetaConnection { - /// The endpoint of store. one of etcd, pg or mysql. - #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)] - store_addrs: Vec, - /// The database backend. - #[clap(long, value_enum)] - backend: Option, - #[clap(long, default_value = "")] - store_key_prefix: String, - #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] - #[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)] - meta_table_name: String, - #[clap(long, default_value = "128")] - max_txn_ops: usize, -} - -impl MetaConnection { - pub async fn build(&self) -> Result { - let max_txn_ops = self.max_txn_ops; - let store_addrs = &self.store_addrs; - if store_addrs.is_empty() { - KvBackendNotSetSnafu { backend: "all" } - .fail() - .map_err(BoxedError::new) - } else { - let kvbackend = match self.backend { - Some(BackendImpl::EtcdStore) => { - let etcd_client = create_etcd_client(store_addrs) - .await - .map_err(BoxedError::new)?; - Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) - } - #[cfg(feature = "pg_kvbackend")] - Some(BackendImpl::PostgresStore) => { - let table_name = &self.meta_table_name; - let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs) - .await - .map_err(BoxedError::new)?; - Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool( - pool, - table_name, - max_txn_ops, - ) - .await - .map_err(BoxedError::new)?) - } - #[cfg(feature = "mysql_kvbackend")] - Some(BackendImpl::MysqlStore) => { - let table_name = &self.meta_table_name; - let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs) - .await - .map_err(BoxedError::new)?; - Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool( - pool, - table_name, - max_txn_ops, - ) - .await - .map_err(BoxedError::new)?) - } - Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu - .fail() - .map_err(BoxedError::new), - _ => KvBackendNotSetSnafu { backend: "all" } - .fail() - .map_err(BoxedError::new), - }; - if self.store_key_prefix.is_empty() { - kvbackend - } else { - let chroot_kvbackend = - ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?); - Ok(Arc::new(chroot_kvbackend)) - } + SnapshotCommand::Save(cmd) => cmd.build().await, + SnapshotCommand::Restore(cmd) => cmd.build().await, + SnapshotCommand::Info(cmd) => cmd.build().await, } } } @@ -214,10 +112,10 @@ impl S3Config { /// It will dump the metadata snapshot to local file or s3 bucket. /// The snapshot file will be in binary format. #[derive(Debug, Default, Parser)] -pub struct MetaSaveCommand { - /// The connection to the metadata store. +pub struct SaveCommand { + /// The store configuration. #[clap(flatten)] - connection: MetaConnection, + store: StoreConfig, /// The s3 config. #[clap(flatten)] s3_config: S3Config, @@ -240,9 +138,9 @@ fn create_local_file_object_store(root: &str) -> Result Ok(object_store) } -impl MetaSaveCommand { +impl SaveCommand { pub async fn build(&self) -> Result, BoxedError> { - let kvbackend = self.connection.build().await?; + let kvbackend = self.store.build().await?; let output_dir = &self.output_dir; let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?; if let Some(store) = object_store { @@ -262,7 +160,7 @@ impl MetaSaveCommand { } } -pub struct MetaSnapshotTool { +struct MetaSnapshotTool { inner: MetadataSnapshotManager, target_file: String, } @@ -278,14 +176,16 @@ impl Tool for MetaSnapshotTool { } } -/// Restore metadata snapshot tool. -/// This tool is used to restore metadata snapshot from etcd, pg or mysql. -/// It will restore the metadata snapshot from local file or s3 bucket. +/// Restore metadata from a snapshot file. +/// +/// This command restores the metadata state from a previously saved snapshot. +/// The snapshot can be loaded from either a local file system or an S3 bucket, +/// depending on the provided configuration. #[derive(Debug, Default, Parser)] -pub struct MetaRestoreCommand { - /// The connection to the metadata store. +pub struct RestoreCommand { + /// The store configuration. #[clap(flatten)] - connection: MetaConnection, + store: StoreConfig, /// The s3 config. #[clap(flatten)] s3_config: S3Config, @@ -299,9 +199,9 @@ pub struct MetaRestoreCommand { force: bool, } -impl MetaRestoreCommand { +impl RestoreCommand { pub async fn build(&self) -> Result, BoxedError> { - let kvbackend = self.connection.build().await?; + let kvbackend = self.store.build().await?; let input_dir = &self.input_dir; let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?; if let Some(store) = object_store { @@ -323,7 +223,7 @@ impl MetaRestoreCommand { } } -pub struct MetaRestoreTool { +struct MetaRestoreTool { inner: MetadataSnapshotManager, source_file: String, force: bool, @@ -372,9 +272,12 @@ impl Tool for MetaRestoreTool { } } -/// Explore metadata from metadata snapshot. +/// Explore metadata from a snapshot file. +/// +/// This command allows filtering the metadata by a specific key and limiting the number of results. +/// It prints the filtered metadata to the console. #[derive(Debug, Default, Parser)] -pub struct MetaInfoCommand { +pub struct InfoCommand { /// The s3 config. #[clap(flatten)] s3_config: S3Config, @@ -389,7 +292,7 @@ pub struct MetaInfoCommand { limit: Option, } -pub struct MetaInfoTool { +struct MetaInfoTool { inner: ObjectStore, source_file: String, inspect_key: String, @@ -415,7 +318,7 @@ impl Tool for MetaInfoTool { } } -impl MetaInfoCommand { +impl InfoCommand { fn decide_object_store_root_for_local_store( file_path: &str, ) -> Result<(&str, &str), BoxedError> { diff --git a/src/plugins/src/cli.rs b/src/plugins/src/cli.rs index c96a592010..bf36b01f1c 100644 --- a/src/plugins/src/cli.rs +++ b/src/plugins/src/cli.rs @@ -13,23 +13,21 @@ // limitations under the License. use clap::Parser; -use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool}; +use cli::{BenchTableMetadataCommand, DataCommand, MetadataCommand, Tool}; use common_error::ext::BoxedError; #[derive(Parser)] pub enum SubCommand { - // Attach(AttachCommand), Bench(BenchTableMetadataCommand), #[clap(subcommand)] Data(DataCommand), #[clap(subcommand)] - Meta(MetaCommand), + Meta(MetadataCommand), } impl SubCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { match self { - // SubCommand::Attach(cmd) => cmd.build().await, SubCommand::Bench(cmd) => cmd.build().await, SubCommand::Data(cmd) => cmd.build().await, SubCommand::Meta(cmd) => cmd.build().await,