mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
feat: add CLI tool to export metadata (#6150)
* chore: add tool to export db meta * chore: add meta restore command * chore: fmt code * chore: remove useless error * chore: support key prefix * chore: add clean check for meta restore * chore: add more log for meta restore * chore: resolve s3 and local file root in command meta-snapshot * chore: remove the pg mysql features from the build script as they are already in the default feature * chore: fix by pr comment
This commit is contained in:
@@ -52,7 +52,7 @@ runs:
|
||||
uses: ./.github/actions/build-greptime-binary
|
||||
with:
|
||||
base-image: ubuntu
|
||||
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
|
||||
features: servers/dashboard
|
||||
cargo-profile: ${{ inputs.cargo-profile }}
|
||||
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
|
||||
version: ${{ inputs.version }}
|
||||
@@ -70,7 +70,7 @@ runs:
|
||||
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
|
||||
with:
|
||||
base-image: centos
|
||||
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
|
||||
features: servers/dashboard
|
||||
cargo-profile: ${{ inputs.cargo-profile }}
|
||||
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
|
||||
version: ${{ inputs.version }}
|
||||
|
||||
77
Cargo.lock
generated
77
Cargo.lock
generated
@@ -1852,8 +1852,9 @@ dependencies = [
|
||||
"futures",
|
||||
"humantime",
|
||||
"meta-client",
|
||||
"meta-srv",
|
||||
"nu-ansi-term",
|
||||
"opendal 0.51.2",
|
||||
"object-store",
|
||||
"query",
|
||||
"rand 0.9.0",
|
||||
"reqwest",
|
||||
@@ -2322,6 +2323,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"common-workload",
|
||||
@@ -2332,6 +2334,7 @@ dependencies = [
|
||||
"deadpool-postgres",
|
||||
"derive_builder 0.20.1",
|
||||
"etcd-client",
|
||||
"flexbuffers",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hex",
|
||||
@@ -2340,6 +2343,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
"moka",
|
||||
"object-store",
|
||||
"prometheus",
|
||||
"prost 0.13.5",
|
||||
"rand 0.9.0",
|
||||
@@ -4260,6 +4264,19 @@ dependencies = [
|
||||
"miniz_oxide",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flexbuffers"
|
||||
version = "25.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"byteorder",
|
||||
"num_enum 0.5.11",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "float-cmp"
|
||||
version = "0.10.0"
|
||||
@@ -7575,13 +7592,34 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
|
||||
dependencies = [
|
||||
"num_enum_derive 0.5.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179"
|
||||
dependencies = [
|
||||
"num_enum_derive",
|
||||
"num_enum_derive 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum_derive"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
|
||||
dependencies = [
|
||||
"proc-macro-crate 1.3.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7635,7 +7673,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"md5",
|
||||
"moka",
|
||||
"opendal 0.52.0",
|
||||
"opendal",
|
||||
"prometheus",
|
||||
"tokio",
|
||||
"uuid",
|
||||
@@ -7674,7 +7712,7 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-util",
|
||||
"object_store",
|
||||
"opendal 0.52.0",
|
||||
"opendal",
|
||||
"pin-project",
|
||||
"tokio",
|
||||
]
|
||||
@@ -7700,35 +7738,6 @@ version = "11.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
|
||||
|
||||
[[package]]
|
||||
name = "opendal"
|
||||
version = "0.51.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b1063ea459fa9e94584115743b06330f437902dd1d9f692b863ef1875a20548"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"backon",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"futures",
|
||||
"getrandom 0.2.15",
|
||||
"http 1.1.0",
|
||||
"log",
|
||||
"md-5",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"quick-xml 0.36.2",
|
||||
"reqsign",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opendal"
|
||||
version = "0.52.0"
|
||||
@@ -8080,7 +8089,7 @@ dependencies = [
|
||||
"arrow 53.4.1",
|
||||
"arrow-ipc 53.4.1",
|
||||
"lazy_static",
|
||||
"num_enum",
|
||||
"num_enum 0.7.3",
|
||||
"opentelemetry-proto 0.27.0",
|
||||
"paste",
|
||||
"prost 0.13.5",
|
||||
|
||||
@@ -43,11 +43,9 @@ etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
humantime.workspace = true
|
||||
meta-client.workspace = true
|
||||
meta-srv.workspace = true
|
||||
nu-ansi-term = "0.46"
|
||||
opendal = { version = "0.51.1", features = [
|
||||
"services-fs",
|
||||
"services-s3",
|
||||
] }
|
||||
object-store.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use object_store::Error as ObjectStoreError;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
@@ -225,7 +226,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: opendal::Error,
|
||||
error: ObjectStoreError,
|
||||
},
|
||||
#[snafu(display("S3 config need be set"))]
|
||||
S3ConfigNotSet {
|
||||
@@ -237,6 +238,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("KV backend not set: {}", backend))]
|
||||
KvBackendNotSet {
|
||||
backend: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -273,8 +280,9 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
Error::OpenDal { .. } => StatusCode::Internal,
|
||||
Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
|
||||
Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
|
||||
Error::S3ConfigNotSet { .. }
|
||||
| Error::OutputDirNotSet { .. }
|
||||
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::BuildRuntime { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ use async_trait::async_trait;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use opendal::layers::LoggingLayer;
|
||||
use opendal::{services, Operator};
|
||||
use object_store::layers::LoggingLayer;
|
||||
use object_store::{services, ObjectStore};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -470,7 +470,7 @@ impl Export {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_operator(&self) -> Result<Operator> {
|
||||
async fn build_operator(&self) -> Result<ObjectStore> {
|
||||
if self.s3 {
|
||||
self.build_s3_operator().await
|
||||
} else {
|
||||
@@ -479,11 +479,11 @@ impl Export {
|
||||
}
|
||||
|
||||
/// build operator with preference for file system
|
||||
async fn build_prefer_fs_operator(&self) -> Result<Operator> {
|
||||
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
|
||||
// is under s3 mode and s3_ddl_dir is set, use it as root
|
||||
if self.s3 && self.s3_ddl_local_dir.is_some() {
|
||||
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
|
||||
let op = Operator::new(services::Fs::default().root(&root))
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
@@ -495,7 +495,7 @@ impl Export {
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_s3_operator(&self) -> Result<Operator> {
|
||||
async fn build_s3_operator(&self) -> Result<ObjectStore> {
|
||||
let mut builder = services::S3::default().bucket(
|
||||
self.s3_bucket
|
||||
.as_ref()
|
||||
@@ -522,20 +522,20 @@ impl Export {
|
||||
builder = builder.secret_access_key(secret_key);
|
||||
}
|
||||
|
||||
let op = Operator::new(builder)
|
||||
let op = ObjectStore::new(builder)
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
Ok(op)
|
||||
}
|
||||
|
||||
async fn build_fs_operator(&self) -> Result<Operator> {
|
||||
async fn build_fs_operator(&self) -> Result<ObjectStore> {
|
||||
let root = self
|
||||
.output_dir
|
||||
.as_ref()
|
||||
.context(OutputDirNotSetSnafu)?
|
||||
.clone();
|
||||
let op = Operator::new(services::Fs::default().root(&root))
|
||||
let op = ObjectStore::new(services::Fs::default().root(&root))
|
||||
.context(OpenDalSnafu)?
|
||||
.layer(LoggingLayer::default())
|
||||
.finish();
|
||||
@@ -642,11 +642,14 @@ impl Export {
|
||||
|
||||
async fn write_to_storage(
|
||||
&self,
|
||||
op: &Operator,
|
||||
op: &ObjectStore,
|
||||
file_path: &str,
|
||||
content: Vec<u8>,
|
||||
) -> Result<()> {
|
||||
op.write(file_path, content).await.context(OpenDalSnafu)
|
||||
op.write(file_path, content)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
fn get_storage_params(&self, schema: &str) -> (String, String) {
|
||||
|
||||
@@ -17,6 +17,7 @@ mod database;
|
||||
pub mod error;
|
||||
mod export;
|
||||
mod import;
|
||||
mod meta_snapshot;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
@@ -27,6 +28,7 @@ use error::Result;
|
||||
pub use crate::bench::BenchTableMetadataCommand;
|
||||
pub use crate::export::ExportCommand;
|
||||
pub use crate::import::ImportCommand;
|
||||
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tool: Send + Sync {
|
||||
|
||||
318
src/cli/src/meta_snapshot.rs
Normal file
318
src/cli/src/meta_snapshot.rs
Normal file
@@ -0,0 +1,318 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
use common_meta::kv_backend::etcd::EtcdStore;
|
||||
use common_meta::kv_backend::rds::{MySqlStore, PgStore};
|
||||
use common_meta::kv_backend::{KvBackendRef, DEFAULT_META_TABLE_NAME};
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use meta_srv::bootstrap::{create_etcd_client, create_mysql_pool, create_postgres_pool};
|
||||
use meta_srv::metasrv::BackendImpl;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::Tool;
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct MetaConnection {
|
||||
/// The endpoint of store. one of etcd, pg or mysql.
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
/// The database backend.
|
||||
#[clap(long, value_enum)]
|
||||
backend: Option<BackendImpl>,
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
#[clap(long,default_value = DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
}
|
||||
|
||||
impl MetaConnection {
|
||||
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
|
||||
let max_txn_ops = self.max_txn_ops;
|
||||
let table_name = &self.meta_table_name;
|
||||
let store_addrs = &self.store_addrs;
|
||||
if store_addrs.is_empty() {
|
||||
KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new)
|
||||
} else {
|
||||
let kvbackend = match self.backend {
|
||||
Some(BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
|
||||
}
|
||||
Some(BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(PgStore::with_pg_pool(pool, table_name, max_txn_ops)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
Some(BackendImpl::MysqlStore) => {
|
||||
let pool = create_mysql_pool(store_addrs)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
}
|
||||
_ => KvBackendNotSetSnafu { backend: "all" }
|
||||
.fail()
|
||||
.map_err(BoxedError::new),
|
||||
};
|
||||
if self.store_key_prefix.is_empty() {
|
||||
kvbackend
|
||||
} else {
|
||||
let chroot_kvbackend =
|
||||
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
|
||||
Ok(Arc::new(chroot_kvbackend))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct S3Config {
|
||||
/// whether to use s3 as the output directory. default is false.
|
||||
#[clap(long, default_value = "false")]
|
||||
s3: bool,
|
||||
/// The s3 bucket name.
|
||||
#[clap(long)]
|
||||
s3_bucket: Option<String>,
|
||||
/// The s3 region.
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
/// The s3 access key.
|
||||
#[clap(long)]
|
||||
s3_access_key: Option<SecretString>,
|
||||
/// The s3 secret key.
|
||||
#[clap(long)]
|
||||
s3_secret_key: Option<SecretString>,
|
||||
/// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
|
||||
#[clap(long)]
|
||||
s3_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
impl S3Config {
|
||||
pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
|
||||
if !self.s3 {
|
||||
Ok(None)
|
||||
} else {
|
||||
if self.s3_region.is_none()
|
||||
|| self.s3_access_key.is_none()
|
||||
|| self.s3_secret_key.is_none()
|
||||
|| self.s3_bucket.is_none()
|
||||
{
|
||||
return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
|
||||
}
|
||||
// Safety, unwrap is safe because we have checked the options above.
|
||||
let mut config = S3::default()
|
||||
.bucket(self.s3_bucket.as_ref().unwrap())
|
||||
.region(self.s3_region.as_ref().unwrap())
|
||||
.access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
|
||||
.secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
|
||||
|
||||
if !root.is_empty() && root != "." {
|
||||
config = config.root(root);
|
||||
}
|
||||
|
||||
if let Some(endpoint) = &self.s3_endpoint {
|
||||
config = config.endpoint(endpoint);
|
||||
}
|
||||
Ok(Some(
|
||||
ObjectStore::new(config)
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Export metadata snapshot tool.
|
||||
/// This tool is used to export metadata snapshot from etcd, pg or mysql.
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
/// The snapshot file will be in binary format.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaSnapshotCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
/// if target output is s3 bucket, this is the root directory in the bucket.
|
||||
/// if target output is local file, this is the local directory.
|
||||
#[clap(long, default_value = "")]
|
||||
output_dir: String,
|
||||
}
|
||||
|
||||
fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
|
||||
let root = if root.is_empty() { "." } else { root };
|
||||
let object_store = ObjectStore::new(Fs::default().root(root))
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish();
|
||||
Ok(object_store)
|
||||
}
|
||||
|
||||
impl MetaSnapshotCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(output_dir)?;
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
target_file: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaSnapshotTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
self.inner
|
||||
.dump("", &self.target_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore metadata snapshot tool.
|
||||
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
|
||||
/// It will restore the metadata snapshot from local file or s3 bucket.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct MetaRestoreCommand {
|
||||
/// The connection to the metadata store.
|
||||
#[clap(flatten)]
|
||||
connection: MetaConnection,
|
||||
/// The s3 config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
/// The name of the target snapshot file.
|
||||
#[clap(long, default_value = "metadata_snapshot.metadata.fb")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
#[clap(long, default_value = ".")]
|
||||
input_dir: String,
|
||||
#[clap(long, default_value = "false")]
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.connection.build().await?;
|
||||
let input_dir = &self.input_dir;
|
||||
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(input_dir)?;
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetaRestoreTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
source_file: String,
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreTool {
|
||||
pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
source_file,
|
||||
force,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaRestoreTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let clean = self
|
||||
.inner
|
||||
.check_target_source_clean()
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
if clean {
|
||||
common_telemetry::info!(
|
||||
"The target source is clean, we will restore the metadata snapshot."
|
||||
);
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
} else if !self.force {
|
||||
common_telemetry::warn!(
|
||||
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
|
||||
);
|
||||
Ok(())
|
||||
} else {
|
||||
common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ name = "greptime"
|
||||
path = "src/bin/greptime.rs"
|
||||
|
||||
[features]
|
||||
default = ["servers/pprof", "servers/mem-prof"]
|
||||
default = ["servers/pprof", "servers/mem-prof", "meta-srv/pg_kvbackend", "meta-srv/mysql_kvbackend"]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
|
||||
[lints]
|
||||
|
||||
@@ -42,6 +42,7 @@ deadpool = { workspace = true, optional = true }
|
||||
deadpool-postgres = { workspace = true, optional = true }
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
flexbuffers = "25.2"
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hex.workspace = true
|
||||
@@ -49,6 +50,7 @@ humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
moka.workspace = true
|
||||
object-store.workspace = true
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -71,6 +73,7 @@ typetag.workspace = true
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
common-procedure = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
common-wal = { workspace = true, features = ["testing"] }
|
||||
datatypes.workspace = true
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
|
||||
@@ -812,6 +812,68 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
error: common_time::error::Error,
|
||||
},
|
||||
#[snafu(display("Invalid file path: {}", file_path))]
|
||||
InvalidFilePath {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize flexbuffers"))]
|
||||
SerializeFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::SerializationError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize flexbuffers"))]
|
||||
DeserializeFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::DeserializationError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read flexbuffers"))]
|
||||
ReadFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::ReaderError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file name: {}", reason))]
|
||||
InvalidFileName {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file extension: {}", reason))]
|
||||
InvalidFileExtension {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write object, file path: {}", file_path))]
|
||||
WriteObject {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object, file path: {}", file_path))]
|
||||
ReadObject {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -834,6 +896,7 @@ impl ErrorExt for Error {
|
||||
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
|
||||
|
||||
Unsupported { .. } => StatusCode::Unsupported,
|
||||
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
SerdeJson { .. }
|
||||
| ParseOption { .. }
|
||||
@@ -867,7 +930,10 @@ impl ErrorExt for Error {
|
||||
| FromUtf8 { .. }
|
||||
| MetadataCorruption { .. }
|
||||
| ParseWalOptions { .. }
|
||||
| KafkaGetOffset { .. } => StatusCode::Unexpected,
|
||||
| KafkaGetOffset { .. }
|
||||
| ReadFlexbuffers { .. }
|
||||
| SerializeFlexbuffers { .. }
|
||||
| DeserializeFlexbuffers { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
|
||||
|
||||
@@ -884,7 +950,10 @@ impl ErrorExt for Error {
|
||||
| InvalidSetDatabaseOption { .. }
|
||||
| InvalidUnsetDatabaseOption { .. }
|
||||
| InvalidTopicNamePrefix { .. }
|
||||
| InvalidTimeZone { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidTimeZone { .. }
|
||||
| InvalidFileExtension { .. }
|
||||
| InvalidFileName { .. }
|
||||
| InvalidFilePath { .. } => StatusCode::InvalidArguments,
|
||||
InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
FlowNotFound { .. } => StatusCode::FlowNotFound,
|
||||
|
||||
@@ -38,6 +38,11 @@ pub mod txn;
|
||||
pub mod util;
|
||||
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
|
||||
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
|
||||
|
||||
#[async_trait]
|
||||
pub trait KvBackend: TxnService
|
||||
where
|
||||
|
||||
@@ -41,6 +41,7 @@ pub mod region_keeper;
|
||||
pub mod region_registry;
|
||||
pub mod rpc;
|
||||
pub mod sequence;
|
||||
pub mod snapshot;
|
||||
pub mod state_store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
|
||||
380
src/common/meta/src/snapshot.rs
Normal file
380
src/common/meta/src/snapshot.rs
Normal file
@@ -0,0 +1,380 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod file;
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::info;
|
||||
use file::{Metadata, MetadataContent};
|
||||
use futures::TryStreamExt;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use strum::Display;
|
||||
|
||||
use crate::error::{
|
||||
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
|
||||
Result, WriteObjectSnafu,
|
||||
};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::{BatchPutRequest, RangeRequest};
|
||||
use crate::rpc::KeyValue;
|
||||
use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
|
||||
|
||||
/// The format of the backup file.
|
||||
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
|
||||
pub enum FileFormat {
|
||||
#[strum(serialize = "fb")]
|
||||
FlexBuffers,
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileFormat {
|
||||
type Error = String;
|
||||
|
||||
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
|
||||
match value.to_lowercase().as_str() {
|
||||
"fb" => Ok(FileFormat::FlexBuffers),
|
||||
_ => Err(format!("Invalid file format: {}", value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Display)]
|
||||
#[strum(serialize_all = "lowercase")]
|
||||
pub enum DataType {
|
||||
Metadata,
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for DataType {
|
||||
type Error = String;
|
||||
|
||||
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
|
||||
match value.to_lowercase().as_str() {
|
||||
"metadata" => Ok(DataType::Metadata),
|
||||
_ => Err(format!("Invalid data type: {}", value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct FileExtension {
|
||||
format: FileFormat,
|
||||
data_type: DataType,
|
||||
}
|
||||
|
||||
impl FileExtension {
|
||||
pub fn new(format: FileFormat, data_type: DataType) -> Self {
|
||||
Self { format, data_type }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for FileExtension {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}.{}", self.data_type, self.format)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileExtension {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let parts = value.split(".").collect::<Vec<&str>>();
|
||||
if parts.len() != 2 {
|
||||
return InvalidFileExtensionSnafu {
|
||||
reason: format!(
|
||||
"Extension should be in the format of <datatype>.<format>, got: {}",
|
||||
value
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let data_type = DataType::try_from(parts[0])
|
||||
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
|
||||
let format = FileFormat::try_from(parts[1])
|
||||
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
|
||||
Ok(FileExtension { format, data_type })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct FileName {
|
||||
name: String,
|
||||
extension: FileExtension,
|
||||
}
|
||||
|
||||
impl Display for FileName {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}.{}", self.name, self.extension)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileName {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let Some((name, extension)) = value.split_once(".") else {
|
||||
return InvalidFileNameSnafu {
|
||||
reason: format!(
|
||||
"The file name should be in the format of <name>.<extension>, got: {}",
|
||||
value
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let extension = FileExtension::try_from(extension)?;
|
||||
Ok(Self {
|
||||
name: name.to_string(),
|
||||
extension,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FileName {
|
||||
fn new(name: String, extension: FileExtension) -> Self {
|
||||
Self { name, extension }
|
||||
}
|
||||
}
|
||||
|
||||
/// The manager of the metadata snapshot.
|
||||
///
|
||||
/// It manages the metadata snapshot, including dumping and restoring.
|
||||
pub struct MetadataSnapshotManager {
|
||||
kv_backend: KvBackendRef,
|
||||
object_store: ObjectStore,
|
||||
}
|
||||
|
||||
/// The maximum size of the request to put metadata, use 1MiB by default.
|
||||
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
|
||||
|
||||
impl MetadataSnapshotManager {
|
||||
pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
kv_backend,
|
||||
object_store,
|
||||
}
|
||||
}
|
||||
|
||||
/// Restores the metadata from the backup file to the metadata store.
|
||||
pub async fn restore(&self, file_path: &str) -> Result<u64> {
|
||||
let path = Path::new(file_path);
|
||||
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.context(InvalidFilePathSnafu { file_path })?;
|
||||
|
||||
let filename = FileName::try_from(file_name)?;
|
||||
let data = self
|
||||
.object_store
|
||||
.read(file_path)
|
||||
.await
|
||||
.context(ReadObjectSnafu { file_path })?;
|
||||
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
|
||||
let metadata_content = document.into_metadata_content()?;
|
||||
let mut req = BatchPutRequest::default();
|
||||
let mut total_request_size = 0;
|
||||
let mut count = 0;
|
||||
let now = Instant::now();
|
||||
for FileKeyValue { key, value } in metadata_content.into_iter() {
|
||||
count += 1;
|
||||
let key_size = key.len();
|
||||
let value_size = value.len();
|
||||
if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
|
||||
self.kv_backend.batch_put(req).await?;
|
||||
req = BatchPutRequest::default();
|
||||
total_request_size = 0;
|
||||
}
|
||||
req.kvs.push(KeyValue { key, value });
|
||||
total_request_size += key_size + value_size;
|
||||
}
|
||||
if !req.kvs.is_empty() {
|
||||
self.kv_backend.batch_put(req).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
|
||||
file_path,
|
||||
count,
|
||||
now.elapsed()
|
||||
);
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn check_target_source_clean(&self) -> Result<bool> {
|
||||
let req = RangeRequest::new().with_range(vec![0], vec![0]);
|
||||
let mut stream = Box::pin(
|
||||
PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
|
||||
);
|
||||
let v = stream.as_mut().try_next().await?;
|
||||
Ok(v.is_none())
|
||||
}
|
||||
|
||||
/// Dumps the metadata to the backup file.
|
||||
pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
|
||||
let format = FileFormat::FlexBuffers;
|
||||
let filename = FileName::new(
|
||||
filename_str.to_string(),
|
||||
FileExtension {
|
||||
format,
|
||||
data_type: DataType::Metadata,
|
||||
},
|
||||
);
|
||||
let file_path_buf = [path, filename.to_string().as_str()]
|
||||
.iter()
|
||||
.collect::<PathBuf>();
|
||||
let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
|
||||
reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
|
||||
})?;
|
||||
let now = Instant::now();
|
||||
let req = RangeRequest::new().with_range(vec![0], vec![0]);
|
||||
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
|
||||
Ok(FileKeyValue {
|
||||
key: kv.key,
|
||||
value: kv.value,
|
||||
})
|
||||
})
|
||||
.into_stream();
|
||||
let keyvalues = stream.try_collect::<Vec<_>>().await?;
|
||||
let num_keyvalues = keyvalues.len();
|
||||
let document = Document::new(
|
||||
Metadata::new(),
|
||||
file::Content::Metadata(MetadataContent::new(keyvalues)),
|
||||
);
|
||||
let bytes = document.to_bytes(&format)?;
|
||||
let r = self
|
||||
.object_store
|
||||
.write(file_path, bytes)
|
||||
.await
|
||||
.context(WriteObjectSnafu { file_path })?;
|
||||
info!(
|
||||
"Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
|
||||
file_path,
|
||||
num_keyvalues,
|
||||
r.content_length(),
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok((filename.to_string(), num_keyvalues as u64))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use object_store::services::Fs;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
use crate::rpc::store::PutRequest;
|
||||
|
||||
#[test]
|
||||
fn test_file_name() {
|
||||
let file_name = FileName::try_from("test.metadata.fb").unwrap();
|
||||
assert_eq!(file_name.name, "test");
|
||||
assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
|
||||
assert_eq!(file_name.extension.data_type, DataType::Metadata);
|
||||
assert_eq!(file_name.to_string(), "test.metadata.fb");
|
||||
|
||||
let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
|
||||
assert_eq!(
|
||||
invalid_file_name.to_string(),
|
||||
"Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
|
||||
);
|
||||
|
||||
let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
|
||||
assert_eq!(
|
||||
invalid_file_extension.to_string(),
|
||||
"Invalid file extension: Invalid file format: hello"
|
||||
);
|
||||
}
|
||||
|
||||
fn test_env(
|
||||
prefix: &str,
|
||||
) -> (
|
||||
TempDir,
|
||||
Arc<MemoryKvBackend<Error>>,
|
||||
MetadataSnapshotManager,
|
||||
) {
|
||||
let temp_dir = create_temp_dir(prefix);
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let temp_path = temp_dir.path();
|
||||
let data_path = temp_path.join("data").as_path().display().to_string();
|
||||
let builder = Fs::default().root(&data_path);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
|
||||
(temp_dir, kv_backend, manager)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dump_and_restore() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
|
||||
let temp_path = temp_dir.path();
|
||||
|
||||
for i in 0..10 {
|
||||
kv_backend
|
||||
.put(
|
||||
PutRequest::new()
|
||||
.with_key(format!("test_{}", i).as_bytes().to_vec())
|
||||
.with_value(format!("value_{}", i).as_bytes().to_vec()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let dump_path = temp_path.join("snapshot");
|
||||
manager
|
||||
.dump(
|
||||
&dump_path.as_path().display().to_string(),
|
||||
"metadata_snapshot",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Clean up the kv backend
|
||||
kv_backend.clear();
|
||||
|
||||
let restore_path = dump_path
|
||||
.join("metadata_snapshot.metadata.fb")
|
||||
.as_path()
|
||||
.display()
|
||||
.to_string();
|
||||
manager.restore(&restore_path).await.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
let key = format!("test_{}", i);
|
||||
let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
|
||||
assert_eq!(value.value, format!("value_{}", i).as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_restore_from_nonexistent_file() {
|
||||
let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
|
||||
let restore_path = temp_dir
|
||||
.path()
|
||||
.join("nonexistent.metadata.fb")
|
||||
.as_path()
|
||||
.display()
|
||||
.to_string();
|
||||
let err = manager.restore(&restore_path).await.unwrap_err();
|
||||
assert_matches!(err, Error::ReadObject { .. })
|
||||
}
|
||||
}
|
||||
145
src/common/meta/src/snapshot/file.rs
Normal file
145
src/common/meta/src/snapshot/file.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::util::current_time_millis;
|
||||
use flexbuffers::{FlexbufferSerializer, Reader};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu,
|
||||
};
|
||||
use crate::snapshot::FileFormat;
|
||||
|
||||
/// The layout of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct Document {
|
||||
metadata: Metadata,
|
||||
content: Content,
|
||||
}
|
||||
|
||||
impl Document {
|
||||
/// Creates a new document.
|
||||
pub fn new(metadata: Metadata, content: Content) -> Self {
|
||||
Self { metadata, content }
|
||||
}
|
||||
|
||||
fn serialize_to_flexbuffer(&self) -> Result<Vec<u8>> {
|
||||
let mut builder = FlexbufferSerializer::new();
|
||||
self.serialize(&mut builder)
|
||||
.context(SerializeFlexbuffersSnafu)?;
|
||||
Ok(builder.take_buffer())
|
||||
}
|
||||
|
||||
/// Converts the [`Document`] to a bytes.
|
||||
pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result<Vec<u8>> {
|
||||
match format {
|
||||
FileFormat::FlexBuffers => self.serialize_to_flexbuffer(),
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_from_flexbuffer(data: &[u8]) -> Result<Self> {
|
||||
let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?;
|
||||
Document::deserialize(reader).context(DeserializeFlexbuffersSnafu)
|
||||
}
|
||||
|
||||
/// Deserializes the [`Document`] from a bytes.
|
||||
pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result<Self> {
|
||||
match format {
|
||||
FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the [`Document`] to a [`MetadataContent`].
|
||||
pub(crate) fn into_metadata_content(self) -> Result<MetadataContent> {
|
||||
match self.content {
|
||||
Content::Metadata(metadata) => Ok(metadata),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The metadata of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct Metadata {
|
||||
// UNIX_EPOCH in milliseconds.
|
||||
created_timestamp_mills: i64,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
/// Create a new metadata.
|
||||
///
|
||||
/// The `created_timestamp_mills` will be the current time in milliseconds.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
created_timestamp_mills: current_time_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The content of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) enum Content {
|
||||
Metadata(MetadataContent),
|
||||
}
|
||||
|
||||
/// The content of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct MetadataContent {
|
||||
values: Vec<KeyValue>,
|
||||
}
|
||||
|
||||
impl MetadataContent {
|
||||
/// Create a new metadata content.
|
||||
pub fn new(values: impl IntoIterator<Item = KeyValue>) -> Self {
|
||||
Self {
|
||||
values: values.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the key-value pairs.
|
||||
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
|
||||
self.values.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// The key-value pair of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct KeyValue {
|
||||
pub key: Vec<u8>,
|
||||
pub value: Vec<u8>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_document() {
|
||||
let document = Document::new(
|
||||
Metadata::new(),
|
||||
Content::Metadata(MetadataContent::new(vec![KeyValue {
|
||||
key: b"key".to_vec(),
|
||||
value: b"value".to_vec(),
|
||||
}])),
|
||||
);
|
||||
|
||||
let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap();
|
||||
let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap();
|
||||
assert_eq!(
|
||||
document.metadata.created_timestamp_mills,
|
||||
document_deserialized.metadata.created_timestamp_mills
|
||||
);
|
||||
assert_eq!(document.content, document_deserialized.content);
|
||||
}
|
||||
}
|
||||
@@ -257,7 +257,7 @@ pub async fn metasrv_builder(
|
||||
(Some(kv_backend), _) => (kv_backend, None),
|
||||
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
|
||||
(None, BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(opts).await?;
|
||||
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
|
||||
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
let election = EtcdElection::with_etcd_client(
|
||||
&opts.server_addr,
|
||||
@@ -270,7 +270,7 @@ pub async fn metasrv_builder(
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pool = create_postgres_pool(opts).await?;
|
||||
let pool = create_postgres_pool(&opts.store_addrs).await?;
|
||||
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
@@ -290,7 +290,7 @@ pub async fn metasrv_builder(
|
||||
}
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
(None, BackendImpl::MysqlStore) => {
|
||||
let pool = create_mysql_pool(opts).await?;
|
||||
let pool = create_mysql_pool(&opts.store_addrs).await?;
|
||||
let kv_backend =
|
||||
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
|
||||
.await
|
||||
@@ -360,9 +360,8 @@ pub async fn metasrv_builder(
|
||||
.plugins(plugins))
|
||||
}
|
||||
|
||||
async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
|
||||
let etcd_endpoints = opts
|
||||
.store_addrs
|
||||
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
|
||||
let etcd_endpoints = store_addrs
|
||||
.iter()
|
||||
.map(|x| x.trim())
|
||||
.filter(|x| !x.is_empty())
|
||||
@@ -393,13 +392,10 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
|
||||
}
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres::Pool> {
|
||||
let postgres_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
pub async fn create_postgres_pool(store_addrs: &[String]) -> Result<deadpool_postgres::Pool> {
|
||||
let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(postgres_url.to_string());
|
||||
let pool = cfg
|
||||
@@ -409,13 +405,10 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres
|
||||
}
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOptions> {
|
||||
let mysql_url = opts
|
||||
.store_addrs
|
||||
.first()
|
||||
.context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
async fn setup_mysql_options(store_addrs: &[String]) -> Result<MySqlConnectOptions> {
|
||||
let mysql_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
|
||||
err_msg: "empty store addrs",
|
||||
})?;
|
||||
// Avoid `SET` commands in sqlx
|
||||
let opts: MySqlConnectOptions = mysql_url
|
||||
.parse()
|
||||
@@ -429,8 +422,8 @@ async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOption
|
||||
}
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
|
||||
let opts = setup_mysql_options(opts).await?;
|
||||
pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
|
||||
let opts = setup_mysql_options(store_addrs).await?;
|
||||
let pool = MySqlPool::connect_with(opts)
|
||||
.await
|
||||
.context(error::CreateMySqlPoolSnafu)?;
|
||||
@@ -439,7 +432,7 @@ async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
|
||||
|
||||
#[cfg(feature = "mysql_kvbackend")]
|
||||
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
|
||||
let opts = setup_mysql_options(opts).await?;
|
||||
let opts = setup_mysql_options(&opts.store_addrs).await?;
|
||||
let client = MySqlConnection::connect_with(&opts)
|
||||
.await
|
||||
.context(error::ConnectMySqlSnafu)?;
|
||||
|
||||
@@ -29,7 +29,10 @@ use common_meta::ddl::ProcedureExecutorRef;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::maintenance::MaintenanceModeManagerRef;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
|
||||
use common_meta::kv_backend::{
|
||||
KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, DEFAULT_META_ELECTION_LOCK_ID,
|
||||
DEFAULT_META_TABLE_NAME,
|
||||
};
|
||||
use common_meta::leadership_notifier::{
|
||||
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
|
||||
};
|
||||
@@ -75,11 +78,6 @@ pub const TABLE_ID_SEQ: &str = "table_id";
|
||||
pub const FLOW_ID_SEQ: &str = "flow_id";
|
||||
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
|
||||
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
|
||||
|
||||
// The datastores that implements metadata kvbackend.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use clap::Parser;
|
||||
use cli::{BenchTableMetadataCommand, ExportCommand, ImportCommand, Tool};
|
||||
use cli::{
|
||||
BenchTableMetadataCommand, ExportCommand, ImportCommand, MetaRestoreCommand,
|
||||
MetaSnapshotCommand, Tool,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -22,6 +25,8 @@ pub enum SubCommand {
|
||||
Bench(BenchTableMetadataCommand),
|
||||
Export(ExportCommand),
|
||||
Import(ImportCommand),
|
||||
MetaSnapshot(MetaSnapshotCommand),
|
||||
MetaRestore(MetaRestoreCommand),
|
||||
}
|
||||
|
||||
impl SubCommand {
|
||||
@@ -31,6 +36,8 @@ impl SubCommand {
|
||||
SubCommand::Bench(cmd) => cmd.build().await,
|
||||
SubCommand::Export(cmd) => cmd.build().await,
|
||||
SubCommand::Import(cmd) => cmd.build().await,
|
||||
SubCommand::MetaSnapshot(cmd) => cmd.build().await,
|
||||
SubCommand::MetaRestore(cmd) => cmd.build().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user