From d55d9addf2d2cbedb8b2b0d33f9221d31f91fd15 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 20 Apr 2025 06:32:56 +0000 Subject: [PATCH] feat: introduce `MetadataSnaphostManager` --- Cargo.lock | 37 +++ src/common/meta/Cargo.toml | 3 + src/common/meta/src/error.rs | 83 ++++++- src/common/meta/src/lib.rs | 1 + src/common/meta/src/snapshot.rs | 359 +++++++++++++++++++++++++++ src/common/meta/src/snapshot/file.rs | 145 +++++++++++ 6 files changed, 626 insertions(+), 2 deletions(-) create mode 100644 src/common/meta/src/snapshot.rs create mode 100644 src/common/meta/src/snapshot/file.rs diff --git a/Cargo.lock b/Cargo.lock index 520b33c929..729771af96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2177,6 +2177,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-telemetry", + "common-test-util", "common-time", "common-wal", "datafusion-common", @@ -2186,6 +2187,7 @@ dependencies = [ "deadpool-postgres", "derive_builder 0.20.1", "etcd-client", + "flexbuffers", "futures", "futures-util", "hex", @@ -2194,6 +2196,7 @@ dependencies = [ "itertools 0.14.0", "lazy_static", "moka", + "object-store", "prometheus", "prost 0.13.5", "rand 0.9.0", @@ -4135,6 +4138,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flexbuffers" +version = "25.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "num_enum", + "serde", + "serde_derive", +] + [[package]] name = "float-cmp" version = "0.10.0" @@ -7546,6 +7562,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num_threads" version = "0.1.7" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index d83956903e..d99380dc26 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -41,6 +41,7 @@ deadpool = { workspace = true, optional = true } deadpool-postgres = { workspace = true, optional = true } derive_builder.workspace = true etcd-client.workspace = true +flexbuffers = "25.2" futures.workspace = true futures-util.workspace = true hex.workspace = true @@ -48,6 +49,7 @@ humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true moka.workspace = true +object-store.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true @@ -70,6 +72,7 @@ typetag.workspace = true [dev-dependencies] chrono.workspace = true common-procedure = { workspace = true, features = ["testing"] } +common-test-util.workspace = true common-wal = { workspace = true, features = ["testing"] } datatypes.workspace = true hyper = { version = "0.14", features = ["full"] } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 1bdb3d0857..4e843a8193 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -783,6 +783,76 @@ pub enum Error { #[snafu(source)] source: common_procedure::error::Error, }, + + #[snafu(display("Invalid file path: {}", file_path))] + InvalidFilePath { + #[snafu(implicit)] + location: Location, + file_path: String, + }, + + #[snafu(display("Failed to serialize flexbuffers"))] + SerializeFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::SerializationError, + }, + + #[snafu(display("Failed to deserialize flexbuffers"))] + DeserializeFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::DeserializationError, + }, + + #[snafu(display("Failed to read flexbuffers"))] + ReadFlexbuffers { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: flexbuffers::ReaderError, + }, + + #[snafu(display("Invalid file name: {}", reason))] + InvalidFileName { + #[snafu(implicit)] + location: Location, + reason: String, + }, + + #[snafu(display("Invalid file extension: {}", reason))] + InvalidFileExtension { + #[snafu(implicit)] + location: Location, + reason: String, + }, + + #[snafu(display("Invalid file context: {}", reason))] + InvalidFileContext { + #[snafu(implicit)] + location: Location, + reason: String, + }, + + #[snafu(display("Failed to write object, file path: {}", file_path))] + WriteObject { + #[snafu(implicit)] + location: Location, + file_path: String, + #[snafu(source)] + error: object_store::Error, + }, + + #[snafu(display("Failed to read object, file path: {}", file_path))] + ReadObject { + #[snafu(implicit)] + location: Location, + file_path: String, + #[snafu(source)] + error: object_store::Error, + }, } pub type Result = std::result::Result; @@ -801,6 +871,8 @@ impl ErrorExt for Error { | SerializeToJson { .. } | DeserializeFromJson { .. } => StatusCode::Internal, + WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, + NoLeader { .. } => StatusCode::TableUnavailable, ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected, @@ -837,7 +909,11 @@ impl ErrorExt for Error { | ProcedureOutput { .. } | FromUtf8 { .. } | MetadataCorruption { .. } - | ParseWalOptions { .. } => StatusCode::Unexpected, + | ParseWalOptions { .. } + | ReadFlexbuffers { .. } + | SerializeFlexbuffers { .. } + | DeserializeFlexbuffers { .. } + | InvalidFileContext { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, @@ -853,7 +929,10 @@ impl ErrorExt for Error { | TlsConfig { .. } | InvalidSetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. } - | InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments, + | InvalidTopicNamePrefix { .. } + | InvalidFileExtension { .. } + | InvalidFileName { .. } + | InvalidFilePath { .. } => StatusCode::InvalidArguments, FlowNotFound { .. } => StatusCode::FlowNotFound, FlowRouteNotFound { .. } => StatusCode::Unexpected, diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index b1cc18d5e4..982326eb6d 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -43,6 +43,7 @@ pub mod region_keeper; pub mod region_registry; pub mod rpc; pub mod sequence; +pub mod snapshot; pub mod state_store; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs new file mode 100644 index 0000000000..bb3290fe75 --- /dev/null +++ b/src/common/meta/src/snapshot.rs @@ -0,0 +1,359 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; + +use std::fmt::{Display, Formatter}; +use std::time::Instant; + +use common_telemetry::info; +use file::{Metadata, MetadataContent}; +use futures::TryStreamExt; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use strum::Display; + +use crate::error::{ + Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu, + Result, WriteObjectSnafu, +}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::{BatchPutRequest, RangeRequest}; +use crate::rpc::KeyValue; +use crate::snapshot::file::{Document, KeyValue as FileKeyValue}; + +/// The format of the backup file. +#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)] +pub(crate) enum FileFormat { + #[strum(serialize = "fb")] + FlexBuffers, +} + +impl TryFrom<&str> for FileFormat { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value.to_lowercase().as_str() { + "fb" => Ok(FileFormat::FlexBuffers), + _ => Err(format!("Invalid file format: {}", value)), + } + } +} + +#[derive(Debug, PartialEq, Eq, Display)] +#[strum(serialize_all = "lowercase")] +pub(crate) enum DataType { + Metadata, +} + +impl TryFrom<&str> for DataType { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value.to_lowercase().as_str() { + "metadata" => Ok(DataType::Metadata), + _ => Err(format!("Invalid data type: {}", value)), + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct FileExtension { + format: FileFormat, + data_type: DataType, +} + +impl Display for FileExtension { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.data_type, self.format) + } +} + +impl TryFrom<&str> for FileExtension { + type Error = Error; + + fn try_from(value: &str) -> Result { + let parts = value.split(".").collect::>(); + if parts.len() != 2 { + return InvalidFileExtensionSnafu { + reason: format!( + "Extension should be in the format of ., got: {}", + value + ), + } + .fail(); + } + + let data_type = DataType::try_from(parts[0]) + .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?; + let format = FileFormat::try_from(parts[1]) + .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?; + Ok(FileExtension { format, data_type }) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct FileName { + name: String, + extension: FileExtension, +} + +impl Display for FileName { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.name, self.extension) + } +} + +impl TryFrom<&str> for FileName { + type Error = Error; + + fn try_from(value: &str) -> Result { + let Some((name, extension)) = value.split_once(".") else { + return InvalidFileNameSnafu { + reason: format!( + "The file name should be in the format of ., got: {}", + value + ), + } + .fail(); + }; + let extension = FileExtension::try_from(extension)?; + Ok(Self { + name: name.to_string(), + extension, + }) + } +} + +impl FileName { + fn new(name: String, extension: FileExtension) -> Self { + Self { name, extension } + } +} + +/// The manager of the metadata snapshot. +/// +/// It manages the metadata snapshot, including dumping and restoring. +pub struct MetadataSnapshotManager { + kv_backend: KvBackendRef, + object_store: ObjectStore, +} + +/// The maximum size of the request to put metadata, use 1MiB by default. +const MAX_REQUEST_SIZE: usize = 1024 * 1024; + +impl MetadataSnapshotManager { + pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self { + Self { + kv_backend, + object_store, + } + } + + /// Restores the metadata from the backup file to the metadata store. + pub async fn restore(&self, file_path: &str) -> Result { + let filename = FileName::try_from( + file_path + .rsplit("/") + .next() + .context(InvalidFilePathSnafu { file_path })?, + )?; + let data = self + .object_store + .read(file_path) + .await + .context(ReadObjectSnafu { file_path })?; + let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?; + let metadata_content = document.into_metadata_content()?; + let mut req = BatchPutRequest::default(); + let mut total_request_size = 0; + let mut count = 0; + let now = Instant::now(); + for FileKeyValue { key, value } in metadata_content.into_iter() { + count += 1; + let key_size = key.len(); + let value_size = value.len(); + if total_request_size + key_size + value_size > MAX_REQUEST_SIZE { + self.kv_backend.batch_put(req).await?; + req = BatchPutRequest::default(); + total_request_size = 0; + } + req.kvs.push(KeyValue { key, value }); + total_request_size += key_size + value_size; + } + if !req.kvs.is_empty() { + self.kv_backend.batch_put(req).await?; + } + + info!( + "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}", + file_path, + count, + now.elapsed() + ); + Ok(count) + } + + /// Dumps the metadata to the backup file. + pub async fn dump(&self, path: &str, filename: &str) -> Result { + let format = FileFormat::FlexBuffers; + let filename = FileName::new( + filename.to_string(), + FileExtension { + format, + data_type: DataType::Metadata, + }, + ); + let file_path = format!("{}/{}", path.trim_end_matches('/'), filename); + let now = Instant::now(); + let req = RangeRequest::new().with_range(vec![0], vec![0]); + let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| { + Ok(FileKeyValue { + key: kv.key, + value: kv.value, + }) + }) + .into_stream(); + let keyvalues = stream.try_collect::>().await?; + let num_keyvalues = keyvalues.len(); + let document = Document::new( + Metadata::new(), + file::Content::Metadata(MetadataContent::new(keyvalues)), + ); + let bytes = document.to_bytes(&format)?; + let r = self + .object_store + .write(&file_path, bytes) + .await + .context(WriteObjectSnafu { + file_path: &file_path, + })?; + info!( + "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}", + file_path, + num_keyvalues, + r.content_length(), + now.elapsed() + ); + + Ok(num_keyvalues as u64) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_test_util::temp_dir::{create_temp_dir, TempDir}; + use object_store::services::Fs; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[test] + fn test_file_name() { + let file_name = FileName::try_from("test.metadata.fb").unwrap(); + assert_eq!(file_name.name, "test"); + assert_eq!(file_name.extension.format, FileFormat::FlexBuffers); + assert_eq!(file_name.extension.data_type, DataType::Metadata); + assert_eq!(file_name.to_string(), "test.metadata.fb"); + + let invalid_file_name = FileName::try_from("test.metadata").unwrap_err(); + assert_eq!( + invalid_file_name.to_string(), + "Invalid file extension: Extension should be in the format of ., got: metadata" + ); + + let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err(); + assert_eq!( + invalid_file_extension.to_string(), + "Invalid file extension: Invalid file format: hello" + ); + } + + fn test_env( + prefix: &str, + ) -> ( + TempDir, + Arc>, + MetadataSnapshotManager, + ) { + let temp_dir = create_temp_dir(prefix); + let kv_backend = Arc::new(MemoryKvBackend::default()); + let temp_path = temp_dir.path(); + let data_path = temp_path.join("data").as_path().display().to_string(); + let builder = Fs::default().root(&data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store); + (temp_dir, kv_backend, manager) + } + + #[tokio::test] + async fn test_dump_and_restore() { + common_telemetry::init_default_ut_logging(); + let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore"); + let temp_path = temp_dir.path(); + + for i in 0..10 { + kv_backend + .put( + PutRequest::new() + .with_key(format!("test_{}", i).as_bytes().to_vec()) + .with_value(format!("value_{}", i).as_bytes().to_vec()), + ) + .await + .unwrap(); + } + let dump_path = temp_path.join("snapshot"); + manager + .dump( + &dump_path.as_path().display().to_string(), + "metadata_snapshot", + ) + .await + .unwrap(); + // Clean up the kv backend + kv_backend.clear(); + + let restore_path = dump_path + .join("metadata_snapshot.metadata.fb") + .as_path() + .display() + .to_string(); + manager.restore(&restore_path).await.unwrap(); + + for i in 0..10 { + let key = format!("test_{}", i); + let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap(); + assert_eq!(value.value, format!("value_{}", i).as_bytes()); + } + } + + #[tokio::test] + async fn test_restore_from_nonexistent_file() { + let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file"); + let restore_path = temp_dir + .path() + .join("nonexistent.metadata.fb") + .as_path() + .display() + .to_string(); + let err = manager.restore(&restore_path).await.unwrap_err(); + assert_matches!(err, Error::ReadObject { .. }) + } +} diff --git a/src/common/meta/src/snapshot/file.rs b/src/common/meta/src/snapshot/file.rs new file mode 100644 index 0000000000..a17254a48d --- /dev/null +++ b/src/common/meta/src/snapshot/file.rs @@ -0,0 +1,145 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::util::current_time_millis; +use flexbuffers::{FlexbufferSerializer, Reader}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{ + DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu, +}; +use crate::snapshot::FileFormat; + +/// The layout of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct Document { + metadata: Metadata, + content: Content, +} + +impl Document { + /// Creates a new document. + pub fn new(metadata: Metadata, content: Content) -> Self { + Self { metadata, content } + } + + fn serialize_to_flexbuffer(&self) -> Result> { + let mut builder = FlexbufferSerializer::new(); + self.serialize(&mut builder) + .context(SerializeFlexbuffersSnafu)?; + Ok(builder.take_buffer()) + } + + /// Converts the [`Document`] to a bytes. + pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result> { + match format { + FileFormat::FlexBuffers => self.serialize_to_flexbuffer(), + } + } + + fn deserialize_from_flexbuffer(data: &[u8]) -> Result { + let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?; + Document::deserialize(reader).context(DeserializeFlexbuffersSnafu) + } + + /// Deserializes the [`Document`] from a bytes. + pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result { + match format { + FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data), + } + } + + /// Converts the [`Document`] to a [`MetadataContent`]. + pub(crate) fn into_metadata_content(self) -> Result { + match self.content { + Content::Metadata(metadata) => Ok(metadata), + } + } +} + +/// The metadata of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct Metadata { + // UNIX_EPOCH in milliseconds. + created_timestamp_mills: i64, +} + +impl Metadata { + /// Create a new metadata. + /// + /// The `created_timestamp_mills` will be the current time in milliseconds. + pub fn new() -> Self { + Self { + created_timestamp_mills: current_time_millis(), + } + } +} + +/// The content of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) enum Content { + Metadata(MetadataContent), +} + +/// The content of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct MetadataContent { + values: Vec, +} + +impl MetadataContent { + /// Create a new metadata content. + pub fn new(values: impl IntoIterator) -> Self { + Self { + values: values.into_iter().collect(), + } + } + + /// Returns an iterator over the key-value pairs. + pub fn into_iter(self) -> impl Iterator { + self.values.into_iter() + } +} + +/// The key-value pair of the backup file. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub(crate) struct KeyValue { + pub key: Vec, + pub value: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_document() { + let document = Document::new( + Metadata::new(), + Content::Metadata(MetadataContent::new(vec![KeyValue { + key: b"key".to_vec(), + value: b"value".to_vec(), + }])), + ); + + let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap(); + let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap(); + assert_eq!( + document.metadata.created_timestamp_mills, + document_deserialized.metadata.created_timestamp_mills + ); + assert_eq!(document.content, document_deserialized.content); + } +}