From 92422dafca7bc3107e7bbf95ff3b39cadee2e71c Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 28 May 2025 11:18:00 +0800 Subject: [PATCH] feat: add CLI tool to export metadata (#6150) * chore: add tool to export db meta * chore: add meta restore command * chore: fmt code * chore: remove useless error * chore: support key prefix * chore: add clean check for meta restore * chore: add more log for meta restore * chore: resolve s3 and local file root in command meta-snapshot * chore: remove the pg mysql features from the build script as they are already in the default feature * chore: fix by pr comment --- .../actions/build-linux-artifacts/action.yml | 4 +- Cargo.lock | 77 ++-- src/cli/Cargo.toml | 6 +- src/cli/src/error.rs | 14 +- src/cli/src/export.rs | 25 +- src/cli/src/lib.rs | 2 + src/cli/src/meta_snapshot.rs | 318 +++++++++++++++ src/cmd/Cargo.toml | 2 +- src/common/meta/Cargo.toml | 3 + src/common/meta/src/error.rs | 73 +++- src/common/meta/src/kv_backend.rs | 5 + src/common/meta/src/lib.rs | 1 + src/common/meta/src/snapshot.rs | 380 ++++++++++++++++++ src/common/meta/src/snapshot/file.rs | 145 +++++++ src/meta-srv/src/bootstrap.rs | 39 +- src/meta-srv/src/metasrv.rs | 10 +- src/plugins/src/cli.rs | 9 +- 17 files changed, 1026 insertions(+), 87 deletions(-) create mode 100644 src/cli/src/meta_snapshot.rs create mode 100644 src/common/meta/src/snapshot.rs create mode 100644 src/common/meta/src/snapshot/file.rs diff --git a/.github/actions/build-linux-artifacts/action.yml b/.github/actions/build-linux-artifacts/action.yml index e3db250f16..9c88b25075 100644 --- a/.github/actions/build-linux-artifacts/action.yml +++ b/.github/actions/build-linux-artifacts/action.yml @@ -52,7 +52,7 @@ runs: uses: ./.github/actions/build-greptime-binary with: base-image: ubuntu - features: servers/dashboard,pg_kvbackend,mysql_kvbackend + features: servers/dashboard cargo-profile: ${{ inputs.cargo-profile }} artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }} version: ${{ inputs.version }} @@ -70,7 +70,7 @@ runs: if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64. with: base-image: centos - features: servers/dashboard,pg_kvbackend,mysql_kvbackend + features: servers/dashboard cargo-profile: ${{ inputs.cargo-profile }} artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }} version: ${{ inputs.version }} diff --git a/Cargo.lock b/Cargo.lock index 3c8afffc28..17c46ffe33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1852,8 +1852,9 @@ dependencies = [ "futures", "humantime", "meta-client", + "meta-srv", "nu-ansi-term", - "opendal 0.51.2", + "object-store", "query", "rand 0.9.0", "reqwest", @@ -2322,6 +2323,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-telemetry", + "common-test-util", "common-time", "common-wal", "common-workload", @@ -2332,6 +2334,7 @@ dependencies = [ "deadpool-postgres", "derive_builder 0.20.1", "etcd-client", + "flexbuffers", "futures", "futures-util", "hex", @@ -2340,6 +2343,7 @@ dependencies = [ "itertools 0.14.0", "lazy_static", "moka", + "object-store", "prometheus", "prost 0.13.5", "rand 0.9.0", @@ -4260,6 +4264,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flexbuffers" +version = "25.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "num_enum 0.5.11", + "serde", + "serde_derive", +] + [[package]] name = "float-cmp" version = "0.10.0" @@ -7575,13 +7592,34 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive 0.5.11", +] + [[package]] name = "num_enum" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" dependencies = [ - "num_enum_derive", + "num_enum_derive 0.7.3", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] @@ -7635,7 +7673,7 @@ dependencies = [ "lazy_static", "md5", "moka", - "opendal 0.52.0", + "opendal", "prometheus", "tokio", "uuid", @@ -7674,7 +7712,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal 0.52.0", + "opendal", "pin-project", "tokio", ] @@ -7700,35 +7738,6 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" -[[package]] -name = "opendal" -version = "0.51.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1063ea459fa9e94584115743b06330f437902dd1d9f692b863ef1875a20548" -dependencies = [ - "anyhow", - "async-trait", - "backon", - "base64 0.22.1", - "bytes", - "chrono", - "crc32c", - "futures", - "getrandom 0.2.15", - "http 1.1.0", - "log", - "md-5", - "once_cell", - "percent-encoding", - "quick-xml 0.36.2", - "reqsign", - "reqwest", - "serde", - "serde_json", - "tokio", - "uuid", -] - [[package]] name = "opendal" version = "0.52.0" @@ -8080,7 +8089,7 @@ dependencies = [ "arrow 53.4.1", "arrow-ipc 53.4.1", "lazy_static", - "num_enum", + "num_enum 0.7.3", "opentelemetry-proto 0.27.0", "paste", "prost 0.13.5", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index ff6f5dea13..5dff7c0d8c 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -43,11 +43,9 @@ etcd-client.workspace = true futures.workspace = true humantime.workspace = true meta-client.workspace = true +meta-srv.workspace = true nu-ansi-term = "0.46" -opendal = { version = "0.51.1", features = [ - "services-fs", - "services-s3", -] } +object-store.workspace = true query.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 6fefbf798e..8a855b7cad 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use object_store::Error as ObjectStoreError; use snafu::{Location, Snafu}; #[derive(Snafu)] @@ -225,7 +226,7 @@ pub enum Error { #[snafu(implicit)] location: Location, #[snafu(source)] - error: opendal::Error, + error: ObjectStoreError, }, #[snafu(display("S3 config need be set"))] S3ConfigNotSet { @@ -237,6 +238,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("KV backend not set: {}", backend))] + KvBackendNotSet { + backend: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -273,8 +280,9 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::OpenDal { .. } => StatusCode::Internal, - Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments, - Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments, + Error::S3ConfigNotSet { .. } + | Error::OutputDirNotSet { .. } + | Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments, Error::BuildRuntime { source, .. } => source.status_code(), diff --git a/src/cli/src/export.rs b/src/cli/src/export.rs index fdf513e62b..3b9b3283ef 100644 --- a/src/cli/src/export.rs +++ b/src/cli/src/export.rs @@ -21,8 +21,8 @@ use async_trait::async_trait; use clap::{Parser, ValueEnum}; use common_error::ext::BoxedError; use common_telemetry::{debug, error, info}; -use opendal::layers::LoggingLayer; -use opendal::{services, Operator}; +use object_store::layers::LoggingLayer; +use object_store::{services, ObjectStore}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; use tokio::sync::Semaphore; @@ -470,7 +470,7 @@ impl Export { Ok(()) } - async fn build_operator(&self) -> Result { + async fn build_operator(&self) -> Result { if self.s3 { self.build_s3_operator().await } else { @@ -479,11 +479,11 @@ impl Export { } /// build operator with preference for file system - async fn build_prefer_fs_operator(&self) -> Result { + async fn build_prefer_fs_operator(&self) -> Result { // is under s3 mode and s3_ddl_dir is set, use it as root if self.s3 && self.s3_ddl_local_dir.is_some() { let root = self.s3_ddl_local_dir.as_ref().unwrap().clone(); - let op = Operator::new(services::Fs::default().root(&root)) + let op = ObjectStore::new(services::Fs::default().root(&root)) .context(OpenDalSnafu)? .layer(LoggingLayer::default()) .finish(); @@ -495,7 +495,7 @@ impl Export { } } - async fn build_s3_operator(&self) -> Result { + async fn build_s3_operator(&self) -> Result { let mut builder = services::S3::default().bucket( self.s3_bucket .as_ref() @@ -522,20 +522,20 @@ impl Export { builder = builder.secret_access_key(secret_key); } - let op = Operator::new(builder) + let op = ObjectStore::new(builder) .context(OpenDalSnafu)? .layer(LoggingLayer::default()) .finish(); Ok(op) } - async fn build_fs_operator(&self) -> Result { + async fn build_fs_operator(&self) -> Result { let root = self .output_dir .as_ref() .context(OutputDirNotSetSnafu)? .clone(); - let op = Operator::new(services::Fs::default().root(&root)) + let op = ObjectStore::new(services::Fs::default().root(&root)) .context(OpenDalSnafu)? .layer(LoggingLayer::default()) .finish(); @@ -642,11 +642,14 @@ impl Export { async fn write_to_storage( &self, - op: &Operator, + op: &ObjectStore, file_path: &str, content: Vec, ) -> Result<()> { - op.write(file_path, content).await.context(OpenDalSnafu) + op.write(file_path, content) + .await + .context(OpenDalSnafu) + .map(|_| ()) } fn get_storage_params(&self, schema: &str) -> (String, String) { diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs index 2e63813a87..e72ccc65c7 100644 --- a/src/cli/src/lib.rs +++ b/src/cli/src/lib.rs @@ -17,6 +17,7 @@ mod database; pub mod error; mod export; mod import; +mod meta_snapshot; use async_trait::async_trait; use clap::Parser; @@ -27,6 +28,7 @@ use error::Result; pub use crate::bench::BenchTableMetadataCommand; pub use crate::export::ExportCommand; pub use crate::import::ImportCommand; +pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand}; #[async_trait] pub trait Tool: Send + Sync { diff --git a/src/cli/src/meta_snapshot.rs b/src/cli/src/meta_snapshot.rs new file mode 100644 index 0000000000..d0622bd7dc --- /dev/null +++ b/src/cli/src/meta_snapshot.rs @@ -0,0 +1,318 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use clap::Parser; +use common_base::secrets::{ExposeSecret, SecretString}; +use common_error::ext::BoxedError; +use common_meta::kv_backend::chroot::ChrootKvBackend; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::rds::{MySqlStore, PgStore}; +use common_meta::kv_backend::{KvBackendRef, DEFAULT_META_TABLE_NAME}; +use common_meta::snapshot::MetadataSnapshotManager; +use meta_srv::bootstrap::{create_etcd_client, create_mysql_pool, create_postgres_pool}; +use meta_srv::metasrv::BackendImpl; +use object_store::services::{Fs, S3}; +use object_store::ObjectStore; +use snafu::ResultExt; + +use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu}; +use crate::Tool; +#[derive(Debug, Default, Parser)] +struct MetaConnection { + /// The endpoint of store. one of etcd, pg or mysql. + #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)] + store_addrs: Vec, + /// The database backend. + #[clap(long, value_enum)] + backend: Option, + #[clap(long, default_value = "")] + store_key_prefix: String, + #[clap(long,default_value = DEFAULT_META_TABLE_NAME)] + meta_table_name: String, + #[clap(long, default_value = "128")] + max_txn_ops: usize, +} + +impl MetaConnection { + pub async fn build(&self) -> Result { + let max_txn_ops = self.max_txn_ops; + let table_name = &self.meta_table_name; + let store_addrs = &self.store_addrs; + if store_addrs.is_empty() { + KvBackendNotSetSnafu { backend: "all" } + .fail() + .map_err(BoxedError::new) + } else { + let kvbackend = match self.backend { + Some(BackendImpl::EtcdStore) => { + let etcd_client = create_etcd_client(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) + } + Some(BackendImpl::PostgresStore) => { + let pool = create_postgres_pool(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(PgStore::with_pg_pool(pool, table_name, max_txn_ops) + .await + .map_err(BoxedError::new)?) + } + Some(BackendImpl::MysqlStore) => { + let pool = create_mysql_pool(store_addrs) + .await + .map_err(BoxedError::new)?; + Ok(MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops) + .await + .map_err(BoxedError::new)?) + } + _ => KvBackendNotSetSnafu { backend: "all" } + .fail() + .map_err(BoxedError::new), + }; + if self.store_key_prefix.is_empty() { + kvbackend + } else { + let chroot_kvbackend = + ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?); + Ok(Arc::new(chroot_kvbackend)) + } + } + } +} + +// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore +#[derive(Debug, Default, Parser)] +struct S3Config { + /// whether to use s3 as the output directory. default is false. + #[clap(long, default_value = "false")] + s3: bool, + /// The s3 bucket name. + #[clap(long)] + s3_bucket: Option, + /// The s3 region. + #[clap(long)] + s3_region: Option, + /// The s3 access key. + #[clap(long)] + s3_access_key: Option, + /// The s3 secret key. + #[clap(long)] + s3_secret_key: Option, + /// The s3 endpoint. we will automatically use the default s3 decided by the region if not set. + #[clap(long)] + s3_endpoint: Option, +} + +impl S3Config { + pub fn build(&self, root: &str) -> Result, BoxedError> { + if !self.s3 { + Ok(None) + } else { + if self.s3_region.is_none() + || self.s3_access_key.is_none() + || self.s3_secret_key.is_none() + || self.s3_bucket.is_none() + { + return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new); + } + // Safety, unwrap is safe because we have checked the options above. + let mut config = S3::default() + .bucket(self.s3_bucket.as_ref().unwrap()) + .region(self.s3_region.as_ref().unwrap()) + .access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret()) + .secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret()); + + if !root.is_empty() && root != "." { + config = config.root(root); + } + + if let Some(endpoint) = &self.s3_endpoint { + config = config.endpoint(endpoint); + } + Ok(Some( + ObjectStore::new(config) + .context(OpenDalSnafu) + .map_err(BoxedError::new)? + .finish(), + )) + } + } +} + +/// Export metadata snapshot tool. +/// This tool is used to export metadata snapshot from etcd, pg or mysql. +/// It will dump the metadata snapshot to local file or s3 bucket. +/// The snapshot file will be in binary format. +#[derive(Debug, Default, Parser)] +pub struct MetaSnapshotCommand { + /// The connection to the metadata store. + #[clap(flatten)] + connection: MetaConnection, + /// The s3 config. + #[clap(flatten)] + s3_config: S3Config, + /// The name of the target snapshot file. we will add the file extension automatically. + #[clap(long, default_value = "metadata_snapshot")] + file_name: String, + /// The directory to store the snapshot file. + /// if target output is s3 bucket, this is the root directory in the bucket. + /// if target output is local file, this is the local directory. + #[clap(long, default_value = "")] + output_dir: String, +} + +fn create_local_file_object_store(root: &str) -> Result { + let root = if root.is_empty() { "." } else { root }; + let object_store = ObjectStore::new(Fs::default().root(root)) + .context(OpenDalSnafu) + .map_err(BoxedError::new)? + .finish(); + Ok(object_store) +} + +impl MetaSnapshotCommand { + pub async fn build(&self) -> Result, BoxedError> { + let kvbackend = self.connection.build().await?; + let output_dir = &self.output_dir; + let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?; + if let Some(store) = object_store { + let tool = MetaSnapshotTool { + inner: MetadataSnapshotManager::new(kvbackend, store), + target_file: self.file_name.clone(), + }; + Ok(Box::new(tool)) + } else { + let object_store = create_local_file_object_store(output_dir)?; + let tool = MetaSnapshotTool { + inner: MetadataSnapshotManager::new(kvbackend, object_store), + target_file: self.file_name.clone(), + }; + Ok(Box::new(tool)) + } + } +} + +pub struct MetaSnapshotTool { + inner: MetadataSnapshotManager, + target_file: String, +} + +#[async_trait] +impl Tool for MetaSnapshotTool { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + self.inner + .dump("", &self.target_file) + .await + .map_err(BoxedError::new)?; + Ok(()) + } +} + +/// Restore metadata snapshot tool. +/// This tool is used to restore metadata snapshot from etcd, pg or mysql. +/// It will restore the metadata snapshot from local file or s3 bucket. +#[derive(Debug, Default, Parser)] +pub struct MetaRestoreCommand { + /// The connection to the metadata store. + #[clap(flatten)] + connection: MetaConnection, + /// The s3 config. + #[clap(flatten)] + s3_config: S3Config, + /// The name of the target snapshot file. + #[clap(long, default_value = "metadata_snapshot.metadata.fb")] + file_name: String, + /// The directory to store the snapshot file. + #[clap(long, default_value = ".")] + input_dir: String, + #[clap(long, default_value = "false")] + force: bool, +} + +impl MetaRestoreCommand { + pub async fn build(&self) -> Result, BoxedError> { + let kvbackend = self.connection.build().await?; + let input_dir = &self.input_dir; + let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?; + if let Some(store) = object_store { + let tool = MetaRestoreTool::new( + MetadataSnapshotManager::new(kvbackend, store), + self.file_name.clone(), + self.force, + ); + Ok(Box::new(tool)) + } else { + let object_store = create_local_file_object_store(input_dir)?; + let tool = MetaRestoreTool::new( + MetadataSnapshotManager::new(kvbackend, object_store), + self.file_name.clone(), + self.force, + ); + Ok(Box::new(tool)) + } + } +} + +pub struct MetaRestoreTool { + inner: MetadataSnapshotManager, + source_file: String, + force: bool, +} + +impl MetaRestoreTool { + pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self { + Self { + inner, + source_file, + force, + } + } +} + +#[async_trait] +impl Tool for MetaRestoreTool { + async fn do_work(&self) -> std::result::Result<(), BoxedError> { + let clean = self + .inner + .check_target_source_clean() + .await + .map_err(BoxedError::new)?; + if clean { + common_telemetry::info!( + "The target source is clean, we will restore the metadata snapshot." + ); + self.inner + .restore(&self.source_file) + .await + .map_err(BoxedError::new)?; + Ok(()) + } else if !self.force { + common_telemetry::warn!( + "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option." + ); + Ok(()) + } else { + common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force."); + self.inner + .restore(&self.source_file) + .await + .map_err(BoxedError::new)?; + Ok(()) + } + } +} diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index b3ffd479a6..416250c66d 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -10,7 +10,7 @@ name = "greptime" path = "src/bin/greptime.rs" [features] -default = ["servers/pprof", "servers/mem-prof"] +default = ["servers/pprof", "servers/mem-prof", "meta-srv/pg_kvbackend", "meta-srv/mysql_kvbackend"] tokio-console = ["common-telemetry/tokio-console"] [lints] diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index bc2facd703..9c1351fa10 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -42,6 +42,7 @@ deadpool = { workspace = true, optional = true } deadpool-postgres = { workspace = true, optional = true } derive_builder.workspace = true etcd-client.workspace = true +flexbuffers = "25.2" futures.workspace = true futures-util.workspace = true hex.workspace = true @@ -49,6 +50,7 @@ humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true moka.workspace = true +object-store.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true @@ -71,6 +73,7 @@ typetag.workspace = true [dev-dependencies] chrono.workspace = true common-procedure = { workspace = true, features = ["testing"] } +common-test-util.workspace = true common-wal = { workspace = true, features = ["testing"] } datatypes.workspace = true hyper = { version = "0.14", features = ["full"] } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index e3f23c64af..d646299f1e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -812,6 +812,68 @@ pub enum Error { #[snafu(source)] error: common_time::error::Error, }, + #[snafu(display("Invalid file path: {}", file_path))] + InvalidFilePath { + #[snafu(implicit)] + location: Location, + file_path: String, + }, + + #[snafu(display("Failed to serialize flexbuffers"))] + SerializeFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::SerializationError, + }, + + #[snafu(display("Failed to deserialize flexbuffers"))] + DeserializeFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::DeserializationError, + }, + + #[snafu(display("Failed to read flexbuffers"))] + ReadFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::ReaderError, + }, + + #[snafu(display("Invalid file name: {}", reason))] + InvalidFileName { + #[snafu(implicit)] + location: Location, + reason: String, + }, + + #[snafu(display("Invalid file extension: {}", reason))] + InvalidFileExtension { + #[snafu(implicit)] + location: Location, + reason: String, + }, + + #[snafu(display("Failed to write object, file path: {}", file_path))] + WriteObject { + #[snafu(implicit)] + location: Location, + file_path: String, + #[snafu(source)] + error: object_store::Error, + }, + + #[snafu(display("Failed to read object, file path: {}", file_path))] + ReadObject { + #[snafu(implicit)] + location: Location, + file_path: String, + #[snafu(source)] + error: object_store::Error, + }, } pub type Result = std::result::Result; @@ -834,6 +896,7 @@ impl ErrorExt for Error { ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, + WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, SerdeJson { .. } | ParseOption { .. } @@ -867,7 +930,10 @@ impl ErrorExt for Error { | FromUtf8 { .. } | MetadataCorruption { .. } | ParseWalOptions { .. } - | KafkaGetOffset { .. } => StatusCode::Unexpected, + | KafkaGetOffset { .. } + | ReadFlexbuffers { .. } + | SerializeFlexbuffers { .. } + | DeserializeFlexbuffers { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, @@ -884,7 +950,10 @@ impl ErrorExt for Error { | InvalidSetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. } | InvalidTopicNamePrefix { .. } - | InvalidTimeZone { .. } => StatusCode::InvalidArguments, + | InvalidTimeZone { .. } + | InvalidFileExtension { .. } + | InvalidFileName { .. } + | InvalidFilePath { .. } => StatusCode::InvalidArguments, InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments, FlowNotFound { .. } => StatusCode::FlowNotFound, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 747d1149c4..ea2736225d 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -38,6 +38,11 @@ pub mod txn; pub mod util; pub type KvBackendRef = Arc + Send + Sync>; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] +pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; + #[async_trait] pub trait KvBackend: TxnService where diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 7bfbd78f9c..8cdcf35c9d 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -41,6 +41,7 @@ pub mod region_keeper; pub mod region_registry; pub mod rpc; pub mod sequence; +pub mod snapshot; pub mod state_store; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs new file mode 100644 index 0000000000..9110f09277 --- /dev/null +++ b/src/common/meta/src/snapshot.rs @@ -0,0 +1,380 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; + +use std::fmt::{Display, Formatter}; +use std::path::{Path, PathBuf}; +use std::time::Instant; + +use common_telemetry::info; +use file::{Metadata, MetadataContent}; +use futures::TryStreamExt; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use strum::Display; + +use crate::error::{ + Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu, + Result, WriteObjectSnafu, +}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::{BatchPutRequest, RangeRequest}; +use crate::rpc::KeyValue; +use crate::snapshot::file::{Document, KeyValue as FileKeyValue}; + +/// The format of the backup file. +#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)] +pub enum FileFormat { + #[strum(serialize = "fb")] + FlexBuffers, +} + +impl TryFrom<&str> for FileFormat { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value.to_lowercase().as_str() { + "fb" => Ok(FileFormat::FlexBuffers), + _ => Err(format!("Invalid file format: {}", value)), + } + } +} + +#[derive(Debug, PartialEq, Eq, Display)] +#[strum(serialize_all = "lowercase")] +pub enum DataType { + Metadata, +} + +impl TryFrom<&str> for DataType { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value.to_lowercase().as_str() { + "metadata" => Ok(DataType::Metadata), + _ => Err(format!("Invalid data type: {}", value)), + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct FileExtension { + format: FileFormat, + data_type: DataType, +} + +impl FileExtension { + pub fn new(format: FileFormat, data_type: DataType) -> Self { + Self { format, data_type } + } +} + +impl Display for FileExtension { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.data_type, self.format) + } +} + +impl TryFrom<&str> for FileExtension { + type Error = Error; + + fn try_from(value: &str) -> Result { + let parts = value.split(".").collect::>(); + if parts.len() != 2 { + return InvalidFileExtensionSnafu { + reason: format!( + "Extension should be in the format of ., got: {}", + value + ), + } + .fail(); + } + + let data_type = DataType::try_from(parts[0]) + .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?; + let format = FileFormat::try_from(parts[1]) + .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?; + Ok(FileExtension { format, data_type }) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct FileName { + name: String, + extension: FileExtension, +} + +impl Display for FileName { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.name, self.extension) + } +} + +impl TryFrom<&str> for FileName { + type Error = Error; + + fn try_from(value: &str) -> Result { + let Some((name, extension)) = value.split_once(".") else { + return InvalidFileNameSnafu { + reason: format!( + "The file name should be in the format of ., got: {}", + value + ), + } + .fail(); + }; + let extension = FileExtension::try_from(extension)?; + Ok(Self { + name: name.to_string(), + extension, + }) + } +} + +impl FileName { + fn new(name: String, extension: FileExtension) -> Self { + Self { name, extension } + } +} + +/// The manager of the metadata snapshot. +/// +/// It manages the metadata snapshot, including dumping and restoring. +pub struct MetadataSnapshotManager { + kv_backend: KvBackendRef, + object_store: ObjectStore, +} + +/// The maximum size of the request to put metadata, use 1MiB by default. +const MAX_REQUEST_SIZE: usize = 1024 * 1024; + +impl MetadataSnapshotManager { + pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self { + Self { + kv_backend, + object_store, + } + } + + /// Restores the metadata from the backup file to the metadata store. + pub async fn restore(&self, file_path: &str) -> Result { + let path = Path::new(file_path); + + let file_name = path + .file_name() + .and_then(|s| s.to_str()) + .context(InvalidFilePathSnafu { file_path })?; + + let filename = FileName::try_from(file_name)?; + let data = self + .object_store + .read(file_path) + .await + .context(ReadObjectSnafu { file_path })?; + let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?; + let metadata_content = document.into_metadata_content()?; + let mut req = BatchPutRequest::default(); + let mut total_request_size = 0; + let mut count = 0; + let now = Instant::now(); + for FileKeyValue { key, value } in metadata_content.into_iter() { + count += 1; + let key_size = key.len(); + let value_size = value.len(); + if total_request_size + key_size + value_size > MAX_REQUEST_SIZE { + self.kv_backend.batch_put(req).await?; + req = BatchPutRequest::default(); + total_request_size = 0; + } + req.kvs.push(KeyValue { key, value }); + total_request_size += key_size + value_size; + } + if !req.kvs.is_empty() { + self.kv_backend.batch_put(req).await?; + } + + info!( + "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}", + file_path, + count, + now.elapsed() + ); + Ok(count) + } + + pub async fn check_target_source_clean(&self) -> Result { + let req = RangeRequest::new().with_range(vec![0], vec![0]); + let mut stream = Box::pin( + PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(), + ); + let v = stream.as_mut().try_next().await?; + Ok(v.is_none()) + } + + /// Dumps the metadata to the backup file. + pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> { + let format = FileFormat::FlexBuffers; + let filename = FileName::new( + filename_str.to_string(), + FileExtension { + format, + data_type: DataType::Metadata, + }, + ); + let file_path_buf = [path, filename.to_string().as_str()] + .iter() + .collect::(); + let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu { + reason: format!("Invalid file path: {}, filename: {}", path, filename_str), + })?; + let now = Instant::now(); + let req = RangeRequest::new().with_range(vec![0], vec![0]); + let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| { + Ok(FileKeyValue { + key: kv.key, + value: kv.value, + }) + }) + .into_stream(); + let keyvalues = stream.try_collect::>().await?; + let num_keyvalues = keyvalues.len(); + let document = Document::new( + Metadata::new(), + file::Content::Metadata(MetadataContent::new(keyvalues)), + ); + let bytes = document.to_bytes(&format)?; + let r = self + .object_store + .write(file_path, bytes) + .await + .context(WriteObjectSnafu { file_path })?; + info!( + "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}", + file_path, + num_keyvalues, + r.content_length(), + now.elapsed() + ); + + Ok((filename.to_string(), num_keyvalues as u64)) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_test_util::temp_dir::{create_temp_dir, TempDir}; + use object_store::services::Fs; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[test] + fn test_file_name() { + let file_name = FileName::try_from("test.metadata.fb").unwrap(); + assert_eq!(file_name.name, "test"); + assert_eq!(file_name.extension.format, FileFormat::FlexBuffers); + assert_eq!(file_name.extension.data_type, DataType::Metadata); + assert_eq!(file_name.to_string(), "test.metadata.fb"); + + let invalid_file_name = FileName::try_from("test.metadata").unwrap_err(); + assert_eq!( + invalid_file_name.to_string(), + "Invalid file extension: Extension should be in the format of ., got: metadata" + ); + + let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err(); + assert_eq!( + invalid_file_extension.to_string(), + "Invalid file extension: Invalid file format: hello" + ); + } + + fn test_env( + prefix: &str, + ) -> ( + TempDir, + Arc>, + MetadataSnapshotManager, + ) { + let temp_dir = create_temp_dir(prefix); + let kv_backend = Arc::new(MemoryKvBackend::default()); + let temp_path = temp_dir.path(); + let data_path = temp_path.join("data").as_path().display().to_string(); + let builder = Fs::default().root(&data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store); + (temp_dir, kv_backend, manager) + } + + #[tokio::test] + async fn test_dump_and_restore() { + common_telemetry::init_default_ut_logging(); + let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore"); + let temp_path = temp_dir.path(); + + for i in 0..10 { + kv_backend + .put( + PutRequest::new() + .with_key(format!("test_{}", i).as_bytes().to_vec()) + .with_value(format!("value_{}", i).as_bytes().to_vec()), + ) + .await + .unwrap(); + } + let dump_path = temp_path.join("snapshot"); + manager + .dump( + &dump_path.as_path().display().to_string(), + "metadata_snapshot", + ) + .await + .unwrap(); + // Clean up the kv backend + kv_backend.clear(); + + let restore_path = dump_path + .join("metadata_snapshot.metadata.fb") + .as_path() + .display() + .to_string(); + manager.restore(&restore_path).await.unwrap(); + + for i in 0..10 { + let key = format!("test_{}", i); + let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap(); + assert_eq!(value.value, format!("value_{}", i).as_bytes()); + } + } + + #[tokio::test] + async fn test_restore_from_nonexistent_file() { + let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file"); + let restore_path = temp_dir + .path() + .join("nonexistent.metadata.fb") + .as_path() + .display() + .to_string(); + let err = manager.restore(&restore_path).await.unwrap_err(); + assert_matches!(err, Error::ReadObject { .. }) + } +} diff --git a/src/common/meta/src/snapshot/file.rs b/src/common/meta/src/snapshot/file.rs new file mode 100644 index 0000000000..a17254a48d --- /dev/null +++ b/src/common/meta/src/snapshot/file.rs @@ -0,0 +1,145 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::util::current_time_millis; +use flexbuffers::{FlexbufferSerializer, Reader}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{ + DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu, +}; +use crate::snapshot::FileFormat; + +/// The layout of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct Document { + metadata: Metadata, + content: Content, +} + +impl Document { + /// Creates a new document. + pub fn new(metadata: Metadata, content: Content) -> Self { + Self { metadata, content } + } + + fn serialize_to_flexbuffer(&self) -> Result> { + let mut builder = FlexbufferSerializer::new(); + self.serialize(&mut builder) + .context(SerializeFlexbuffersSnafu)?; + Ok(builder.take_buffer()) + } + + /// Converts the [`Document`] to a bytes. + pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result> { + match format { + FileFormat::FlexBuffers => self.serialize_to_flexbuffer(), + } + } + + fn deserialize_from_flexbuffer(data: &[u8]) -> Result { + let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?; + Document::deserialize(reader).context(DeserializeFlexbuffersSnafu) + } + + /// Deserializes the [`Document`] from a bytes. + pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result { + match format { + FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data), + } + } + + /// Converts the [`Document`] to a [`MetadataContent`]. + pub(crate) fn into_metadata_content(self) -> Result { + match self.content { + Content::Metadata(metadata) => Ok(metadata), + } + } +} + +/// The metadata of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct Metadata { + // UNIX_EPOCH in milliseconds. + created_timestamp_mills: i64, +} + +impl Metadata { + /// Create a new metadata. + /// + /// The `created_timestamp_mills` will be the current time in milliseconds. + pub fn new() -> Self { + Self { + created_timestamp_mills: current_time_millis(), + } + } +} + +/// The content of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) enum Content { + Metadata(MetadataContent), +} + +/// The content of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct MetadataContent { + values: Vec, +} + +impl MetadataContent { + /// Create a new metadata content. + pub fn new(values: impl IntoIterator) -> Self { + Self { + values: values.into_iter().collect(), + } + } + + /// Returns an iterator over the key-value pairs. + pub fn into_iter(self) -> impl Iterator { + self.values.into_iter() + } +} + +/// The key-value pair of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct KeyValue { + pub key: Vec, + pub value: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_document() { + let document = Document::new( + Metadata::new(), + Content::Metadata(MetadataContent::new(vec![KeyValue { + key: b"key".to_vec(), + value: b"value".to_vec(), + }])), + ); + + let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap(); + let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap(); + assert_eq!( + document.metadata.created_timestamp_mills, + document_deserialized.metadata.created_timestamp_mills + ); + assert_eq!(document.content, document_deserialized.content); + } +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 237af0612b..15e7d8d11f 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -257,7 +257,7 @@ pub async fn metasrv_builder( (Some(kv_backend), _) => (kv_backend, None), (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), (None, BackendImpl::EtcdStore) => { - let etcd_client = create_etcd_client(opts).await?; + let etcd_client = create_etcd_client(&opts.store_addrs).await?; let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); let election = EtcdElection::with_etcd_client( &opts.server_addr, @@ -270,7 +270,7 @@ pub async fn metasrv_builder( } #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { - let pool = create_postgres_pool(opts).await?; + let pool = create_postgres_pool(&opts.store_addrs).await?; let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; @@ -290,7 +290,7 @@ pub async fn metasrv_builder( } #[cfg(feature = "mysql_kvbackend")] (None, BackendImpl::MysqlStore) => { - let pool = create_mysql_pool(opts).await?; + let pool = create_mysql_pool(&opts.store_addrs).await?; let kv_backend = MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await @@ -360,9 +360,8 @@ pub async fn metasrv_builder( .plugins(plugins)) } -async fn create_etcd_client(opts: &MetasrvOptions) -> Result { - let etcd_endpoints = opts - .store_addrs +pub async fn create_etcd_client(store_addrs: &[String]) -> Result { + let etcd_endpoints = store_addrs .iter() .map(|x| x.trim()) .filter(|x| !x.is_empty()) @@ -393,13 +392,10 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result Result { - let postgres_url = opts - .store_addrs - .first() - .context(error::InvalidArgumentsSnafu { - err_msg: "empty store addrs", - })?; +pub async fn create_postgres_pool(store_addrs: &[String]) -> Result { + let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; let mut cfg = Config::new(); cfg.url = Some(postgres_url.to_string()); let pool = cfg @@ -409,13 +405,10 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result Result { - let mysql_url = opts - .store_addrs - .first() - .context(error::InvalidArgumentsSnafu { - err_msg: "empty store addrs", - })?; +async fn setup_mysql_options(store_addrs: &[String]) -> Result { + let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; // Avoid `SET` commands in sqlx let opts: MySqlConnectOptions = mysql_url .parse() @@ -429,8 +422,8 @@ async fn setup_mysql_options(opts: &MetasrvOptions) -> Result Result { - let opts = setup_mysql_options(opts).await?; +pub async fn create_mysql_pool(store_addrs: &[String]) -> Result { + let opts = setup_mysql_options(store_addrs).await?; let pool = MySqlPool::connect_with(opts) .await .context(error::CreateMySqlPoolSnafu)?; @@ -439,7 +432,7 @@ async fn create_mysql_pool(opts: &MetasrvOptions) -> Result { #[cfg(feature = "mysql_kvbackend")] async fn create_mysql_client(opts: &MetasrvOptions) -> Result { - let opts = setup_mysql_options(opts).await?; + let opts = setup_mysql_options(&opts.store_addrs).await?; let client = MySqlConnection::connect_with(&opts) .await .context(error::ConnectMySqlSnafu)?; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 8153b739f9..6a1d8111d5 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -29,7 +29,10 @@ use common_meta::ddl::ProcedureExecutorRef; use common_meta::distributed_time_constants; use common_meta::key::maintenance::MaintenanceModeManagerRef; use common_meta::key::TableMetadataManagerRef; -use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; +use common_meta::kv_backend::{ + KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, DEFAULT_META_ELECTION_LOCK_ID, + DEFAULT_META_TABLE_NAME, +}; use common_meta::leadership_notifier::{ LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef, }; @@ -75,11 +78,6 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "./greptimedb_data/metasrv"; -#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] -pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; -#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] -pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; - // The datastores that implements metadata kvbackend. #[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)] #[serde(rename_all = "snake_case")] diff --git a/src/plugins/src/cli.rs b/src/plugins/src/cli.rs index 79f5c64aa0..be96bd820f 100644 --- a/src/plugins/src/cli.rs +++ b/src/plugins/src/cli.rs @@ -13,7 +13,10 @@ // limitations under the License. use clap::Parser; -use cli::{BenchTableMetadataCommand, ExportCommand, ImportCommand, Tool}; +use cli::{ + BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand, + MetaSnapshotCommand, Tool, +}; use common_error::ext::BoxedError; #[derive(Parser)] @@ -22,6 +25,8 @@ pub enum SubCommand { Bench(BenchTableMetadataCommand), Export(ExportCommand), Import(ImportCommand), + MetaSnapshot(MetaSnapshotCommand), + MetaRestore(MetaRestoreCommand), } impl SubCommand { @@ -31,6 +36,8 @@ impl SubCommand { SubCommand::Bench(cmd) => cmd.build().await, SubCommand::Export(cmd) => cmd.build().await, SubCommand::Import(cmd) => cmd.build().await, + SubCommand::MetaSnapshot(cmd) => cmd.build().await, + SubCommand::MetaRestore(cmd) => cmd.build().await, } } }