diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 3c2905cc0f..2bd5d709b2 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -58,6 +58,7 @@ where info!("{desc}, average operation cost: {cost:.2} ms"); } +/// Command to benchmark table metadata operations. #[derive(Debug, Default, Parser)] pub struct BenchTableMetadataCommand { #[clap(long)] diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 8a855b7cad..c6852b0e28 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -244,6 +244,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unsupported memory backend"))] + UnsupportedMemoryBackend { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("File path invalid: {}", msg))] + InvalidFilePath { + msg: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -262,6 +274,8 @@ impl ErrorExt for Error { | Error::ConnectEtcd { .. } | Error::CreateDir { .. } | Error::EmptyResult { .. } + | Error::InvalidFilePath { .. } + | Error::UnsupportedMemoryBackend { .. } | Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } diff --git a/src/cli/src/export.rs b/src/cli/src/export.rs index 2bc7c33a92..4d5f817949 100644 --- a/src/cli/src/export.rs +++ b/src/cli/src/export.rs @@ -50,6 +50,7 @@ enum ExportTarget { All, } +/// Command for exporting data from the GreptimeDB. #[derive(Debug, Default, Parser)] pub struct ExportCommand { /// Server address to connect diff --git a/src/cli/src/import.rs b/src/cli/src/import.rs index 7cff2fd37f..39d45f1061 100644 --- a/src/cli/src/import.rs +++ b/src/cli/src/import.rs @@ -40,6 +40,7 @@ enum ImportTarget { All, } +/// Command to import data from a directory into a GreptimeDB instance. #[derive(Debug, Default, Parser)] pub struct ImportCommand { /// Server address to connect diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs index e72ccc65c7..deb8fbecb5 100644 --- a/src/cli/src/lib.rs +++ b/src/cli/src/lib.rs @@ -20,7 +20,7 @@ mod import; mod meta_snapshot; use async_trait::async_trait; -use clap::Parser; +use clap::{Parser, Subcommand}; use common_error::ext::BoxedError; pub use database::DatabaseClient; use error::Result; @@ -28,7 +28,7 @@ use error::Result; pub use crate::bench::BenchTableMetadataCommand; pub use crate::export::ExportCommand; pub use crate::import::ImportCommand; -pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand}; +pub use crate::meta_snapshot::{MetaCommand, MetaInfoCommand, MetaRestoreCommand, MetaSaveCommand}; #[async_trait] pub trait Tool: Send + Sync { @@ -51,3 +51,19 @@ impl AttachCommand { unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") } } + +/// Subcommand for data operations like export and import. +#[derive(Subcommand)] +pub enum DataCommand { + Export(ExportCommand), + Import(ImportCommand), +} + +impl DataCommand { + pub async fn build(&self) -> std::result::Result, BoxedError> { + match self { + DataCommand::Export(cmd) => cmd.build().await, + DataCommand::Import(cmd) => cmd.build().await, + } + } +} diff --git a/src/cli/src/meta_snapshot.rs b/src/cli/src/meta_snapshot.rs index 670a9a2c10..6de31c0401 100644 --- a/src/cli/src/meta_snapshot.rs +++ b/src/cli/src/meta_snapshot.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::Path; use std::sync::Arc; use async_trait::async_trait; -use clap::Parser; +use clap::{Parser, Subcommand}; use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -26,10 +27,50 @@ use meta_srv::bootstrap::create_etcd_client; use meta_srv::metasrv::BackendImpl; use object_store::services::{Fs, S3}; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu}; +use crate::error::{ + InvalidFilePathSnafu, KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu, + UnsupportedMemoryBackendSnafu, +}; use crate::Tool; + +/// Subcommand for metadata snapshot management. +#[derive(Subcommand)] +pub enum MetaCommand { + #[clap(subcommand)] + Snapshot(MetaSnapshotCommand), +} + +impl MetaCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + MetaCommand::Snapshot(cmd) => cmd.build().await, + } + } +} + +/// Subcommand for metadata snapshot operations. such as save, restore and info. +#[derive(Subcommand)] +pub enum MetaSnapshotCommand { + /// Export metadata snapshot tool. + Save(MetaSaveCommand), + /// Restore metadata snapshot tool. + Restore(MetaRestoreCommand), + /// Explore metadata from metadata snapshot. + Info(MetaInfoCommand), +} + +impl MetaSnapshotCommand { + pub async fn build(&self) -> Result, BoxedError> { + match self { + MetaSnapshotCommand::Save(cmd) => cmd.build().await, + MetaSnapshotCommand::Restore(cmd) => cmd.build().await, + MetaSnapshotCommand::Info(cmd) => cmd.build().await, + } + } +} + #[derive(Debug, Default, Parser)] struct MetaConnection { /// The endpoint of store. one of etcd, pg or mysql. @@ -91,6 +132,9 @@ impl MetaConnection { .await .map_err(BoxedError::new)?) } + Some(BackendImpl::MemoryStore) => UnsupportedMemoryBackendSnafu + .fail() + .map_err(BoxedError::new), _ => KvBackendNotSetSnafu { backend: "all" } .fail() .map_err(BoxedError::new), @@ -170,7 +214,7 @@ impl S3Config { /// It will dump the metadata snapshot to local file or s3 bucket. /// The snapshot file will be in binary format. #[derive(Debug, Default, Parser)] -pub struct MetaSnapshotCommand { +pub struct MetaSaveCommand { /// The connection to the metadata store. #[clap(flatten)] connection: MetaConnection, @@ -196,7 +240,7 @@ fn create_local_file_object_store(root: &str) -> Result Ok(object_store) } -impl MetaSnapshotCommand { +impl MetaSaveCommand { pub async fn build(&self) -> Result, BoxedError> { let kvbackend = self.connection.build().await?; let output_dir = &self.output_dir; @@ -327,3 +371,89 @@ impl Tool for MetaRestoreTool { } } } + +/// Explore metadata from metadata snapshot. +#[derive(Debug, Default, Parser)] +pub struct MetaInfoCommand { + /// The s3 config. + #[clap(flatten)] + s3_config: S3Config, + /// The name of the target snapshot file. we will add the file extension automatically. + #[clap(long, default_value = "metadata_snapshot")] + file_name: String, + /// The query string to filter the metadata. + #[clap(long, default_value = "*")] + inspect_key: String, + /// The limit of the metadata to query. + #[clap(long)] + limit: Option, +} + +pub struct MetaInfoTool { + inner: ObjectStore, + source_file: String, + inspect_key: String, + limit: Option, +} + +#[async_trait] +impl Tool for MetaInfoTool { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + let result = MetadataSnapshotManager::info( + &self.inner, + &self.source_file, + &self.inspect_key, + self.limit, + ) + .await + .map_err(BoxedError::new)?; + for item in result { + println!("{}", item); + } + Ok(()) + } +} + +impl MetaInfoCommand { + fn decide_object_store_root_for_local_store( + file_path: &str, + ) -> Result<(&str, &str), BoxedError> { + let path = Path::new(file_path); + let parent = path + .parent() + .and_then(|p| p.to_str()) + .context(InvalidFilePathSnafu { msg: file_path }) + .map_err(BoxedError::new)?; + let file_name = path + .file_name() + .and_then(|f| f.to_str()) + .context(InvalidFilePathSnafu { msg: file_path }) + .map_err(BoxedError::new)?; + let root = if parent.is_empty() { "." } else { parent }; + Ok((root, file_name)) + } + + pub async fn build(&self) -> Result, BoxedError> { + let object_store = self.s3_config.build("").map_err(BoxedError::new)?; + if let Some(store) = object_store { + let tool = MetaInfoTool { + inner: store, + source_file: self.file_name.clone(), + inspect_key: self.inspect_key.clone(), + limit: self.limit, + }; + Ok(Box::new(tool)) + } else { + let (root, file_name) = + Self::decide_object_store_root_for_local_store(&self.file_name)?; + let object_store = create_local_file_object_store(root)?; + let tool = MetaInfoTool { + inner: object_store, + source_file: file_name.to_string(), + inspect_key: self.inspect_key.clone(), + limit: self.limit, + }; + Ok(Box::new(tool)) + } + } +} diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index b57da1216a..0c879c38a3 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -146,6 +146,7 @@ mod tests { let output_dir = tempfile::tempdir().unwrap(); let cli = cli::Command::parse_from([ "cli", + "data", "export", "--addr", "127.0.0.1:4000", diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index 9110f09277..6cd94768a9 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -14,6 +14,7 @@ pub mod file; +use std::borrow::Cow; use std::fmt::{Display, Formatter}; use std::path::{Path, PathBuf}; use std::time::Instant; @@ -271,6 +272,49 @@ impl MetadataSnapshotManager { Ok((filename.to_string(), num_keyvalues as u64)) } + + fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String { + format!("{} => {}", key, value) + } + + pub async fn info( + object_store: &ObjectStore, + file_path: &str, + query_str: &str, + limit: Option, + ) -> Result> { + let path = Path::new(file_path); + + let file_name = path + .file_name() + .and_then(|s| s.to_str()) + .context(InvalidFilePathSnafu { file_path })?; + + let filename = FileName::try_from(file_name)?; + let data = object_store + .read(file_path) + .await + .context(ReadObjectSnafu { file_path })?; + let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?; + let metadata_content = document.into_metadata_content()?.values(); + let mut results = Vec::with_capacity(limit.unwrap_or(256)); + for kv in metadata_content { + let key_str = String::from_utf8_lossy(&kv.key); + if let Some(prefix) = query_str.strip_suffix('*') { + if key_str.starts_with(prefix) { + let value_str = String::from_utf8_lossy(&kv.value); + results.push(Self::format_output(key_str, value_str)); + } + } else if key_str == query_str { + let value_str = String::from_utf8_lossy(&kv.value); + results.push(Self::format_output(key_str, value_str)); + } + if results.len() == limit.unwrap_or(usize::MAX) { + break; + } + } + Ok(results) + } } #[cfg(test)] diff --git a/src/common/meta/src/snapshot/file.rs b/src/common/meta/src/snapshot/file.rs index a17254a48d..8b3f7fd6d5 100644 --- a/src/common/meta/src/snapshot/file.rs +++ b/src/common/meta/src/snapshot/file.rs @@ -111,6 +111,11 @@ impl MetadataContent { pub fn into_iter(self) -> impl Iterator { self.values.into_iter() } + + /// Returns the key-value pairs as a vector. + pub fn values(self) -> Vec { + self.values + } } /// The key-value pair of the backup file. diff --git a/src/plugins/src/cli.rs b/src/plugins/src/cli.rs index be96bd820f..c96a592010 100644 --- a/src/plugins/src/cli.rs +++ b/src/plugins/src/cli.rs @@ -13,20 +13,17 @@ // limitations under the License. use clap::Parser; -use cli::{ - BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand, - MetaSnapshotCommand, Tool, -}; +use cli::{BenchTableMetadataCommand, DataCommand, MetaCommand, Tool}; use common_error::ext::BoxedError; #[derive(Parser)] pub enum SubCommand { // Attach(AttachCommand), Bench(BenchTableMetadataCommand), - Export(ExportCommand), - Import(ImportCommand), - MetaSnapshot(MetaSnapshotCommand), - MetaRestore(MetaRestoreCommand), + #[clap(subcommand)] + Data(DataCommand), + #[clap(subcommand)] + Meta(MetaCommand), } impl SubCommand { @@ -34,10 +31,8 @@ impl SubCommand { match self { // SubCommand::Attach(cmd) => cmd.build().await, SubCommand::Bench(cmd) => cmd.build().await, - SubCommand::Export(cmd) => cmd.build().await, - SubCommand::Import(cmd) => cmd.build().await, - SubCommand::MetaSnapshot(cmd) => cmd.build().await, - SubCommand::MetaRestore(cmd) => cmd.build().await, + SubCommand::Data(cmd) => cmd.build().await, + SubCommand::Meta(cmd) => cmd.build().await, } } }