feat: introduce MetadataSnaphostManager

This commit is contained in:
WenyXu
2025-04-20 06:32:56 +00:00
parent e817a65d75
commit d55d9addf2
6 changed files with 626 additions and 2 deletions

37
Cargo.lock generated
View File

@@ -2177,6 +2177,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"datafusion-common",
@@ -2186,6 +2187,7 @@ dependencies = [
"deadpool-postgres",
"derive_builder 0.20.1",
"etcd-client",
"flexbuffers",
"futures",
"futures-util",
"hex",
@@ -2194,6 +2196,7 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"moka",
"object-store",
"prometheus",
"prost 0.13.5",
"rand 0.9.0",
@@ -4135,6 +4138,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flexbuffers"
version = "25.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"num_enum",
"serde",
"serde_derive",
]
[[package]]
name = "float-cmp"
version = "0.10.0"
@@ -7546,6 +7562,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "num_threads"
version = "0.1.7"

View File

@@ -41,6 +41,7 @@ deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true
etcd-client.workspace = true
flexbuffers = "25.2"
futures.workspace = true
futures-util.workspace = true
hex.workspace = true
@@ -48,6 +49,7 @@ humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
moka.workspace = true
object-store.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
@@ -70,6 +72,7 @@ typetag.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-wal = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }

View File

@@ -783,6 +783,76 @@ pub enum Error {
#[snafu(source)]
source: common_procedure::error::Error,
},
#[snafu(display("Invalid file path: {}", file_path))]
InvalidFilePath {
#[snafu(implicit)]
location: Location,
file_path: String,
},
#[snafu(display("Failed to serialize flexbuffers"))]
SerializeFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::SerializationError,
},
#[snafu(display("Failed to deserialize flexbuffers"))]
DeserializeFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::DeserializationError,
},
#[snafu(display("Failed to read flexbuffers"))]
ReadFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::ReaderError,
},
#[snafu(display("Invalid file name: {}", reason))]
InvalidFileName {
#[snafu(implicit)]
location: Location,
reason: String,
},
#[snafu(display("Invalid file extension: {}", reason))]
InvalidFileExtension {
#[snafu(implicit)]
location: Location,
reason: String,
},
#[snafu(display("Invalid file context: {}", reason))]
InvalidFileContext {
#[snafu(implicit)]
location: Location,
reason: String,
},
#[snafu(display("Failed to write object, file path: {}", file_path))]
WriteObject {
#[snafu(implicit)]
location: Location,
file_path: String,
#[snafu(source)]
error: object_store::Error,
},
#[snafu(display("Failed to read object, file path: {}", file_path))]
ReadObject {
#[snafu(implicit)]
location: Location,
file_path: String,
#[snafu(source)]
error: object_store::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -801,6 +871,8 @@ impl ErrorExt for Error {
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
@@ -837,7 +909,11 @@ impl ErrorExt for Error {
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. }
| ParseWalOptions { .. } => StatusCode::Unexpected,
| ParseWalOptions { .. }
| ReadFlexbuffers { .. }
| SerializeFlexbuffers { .. }
| DeserializeFlexbuffers { .. }
| InvalidFileContext { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
@@ -853,7 +929,10 @@ impl ErrorExt for Error {
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
| InvalidTopicNamePrefix { .. }
| InvalidFileExtension { .. }
| InvalidFileName { .. }
| InvalidFilePath { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -43,6 +43,7 @@ pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod snapshot;
pub mod state_store;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -0,0 +1,359 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod file;
use std::fmt::{Display, Formatter};
use std::time::Instant;
use common_telemetry::info;
use file::{Metadata, MetadataContent};
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use strum::Display;
use crate::error::{
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
Result, WriteObjectSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
/// The format of the backup file.
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
pub(crate) enum FileFormat {
#[strum(serialize = "fb")]
FlexBuffers,
}
impl TryFrom<&str> for FileFormat {
type Error = String;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"fb" => Ok(FileFormat::FlexBuffers),
_ => Err(format!("Invalid file format: {}", value)),
}
}
}
#[derive(Debug, PartialEq, Eq, Display)]
#[strum(serialize_all = "lowercase")]
pub(crate) enum DataType {
Metadata,
}
impl TryFrom<&str> for DataType {
type Error = String;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"metadata" => Ok(DataType::Metadata),
_ => Err(format!("Invalid data type: {}", value)),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct FileExtension {
format: FileFormat,
data_type: DataType,
}
impl Display for FileExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.data_type, self.format)
}
}
impl TryFrom<&str> for FileExtension {
type Error = Error;
fn try_from(value: &str) -> Result<Self> {
let parts = value.split(".").collect::<Vec<&str>>();
if parts.len() != 2 {
return InvalidFileExtensionSnafu {
reason: format!(
"Extension should be in the format of <datatype>.<format>, got: {}",
value
),
}
.fail();
}
let data_type = DataType::try_from(parts[0])
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
let format = FileFormat::try_from(parts[1])
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
Ok(FileExtension { format, data_type })
}
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct FileName {
name: String,
extension: FileExtension,
}
impl Display for FileName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.name, self.extension)
}
}
impl TryFrom<&str> for FileName {
type Error = Error;
fn try_from(value: &str) -> Result<Self> {
let Some((name, extension)) = value.split_once(".") else {
return InvalidFileNameSnafu {
reason: format!(
"The file name should be in the format of <name>.<extension>, got: {}",
value
),
}
.fail();
};
let extension = FileExtension::try_from(extension)?;
Ok(Self {
name: name.to_string(),
extension,
})
}
}
impl FileName {
fn new(name: String, extension: FileExtension) -> Self {
Self { name, extension }
}
}
/// The manager of the metadata snapshot.
///
/// It manages the metadata snapshot, including dumping and restoring.
pub struct MetadataSnapshotManager {
kv_backend: KvBackendRef,
object_store: ObjectStore,
}
/// The maximum size of the request to put metadata, use 1MiB by default.
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
impl MetadataSnapshotManager {
pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
Self {
kv_backend,
object_store,
}
}
/// Restores the metadata from the backup file to the metadata store.
pub async fn restore(&self, file_path: &str) -> Result<u64> {
let filename = FileName::try_from(
file_path
.rsplit("/")
.next()
.context(InvalidFilePathSnafu { file_path })?,
)?;
let data = self
.object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?;
let mut req = BatchPutRequest::default();
let mut total_request_size = 0;
let mut count = 0;
let now = Instant::now();
for FileKeyValue { key, value } in metadata_content.into_iter() {
count += 1;
let key_size = key.len();
let value_size = value.len();
if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
self.kv_backend.batch_put(req).await?;
req = BatchPutRequest::default();
total_request_size = 0;
}
req.kvs.push(KeyValue { key, value });
total_request_size += key_size + value_size;
}
if !req.kvs.is_empty() {
self.kv_backend.batch_put(req).await?;
}
info!(
"Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
file_path,
count,
now.elapsed()
);
Ok(count)
}
/// Dumps the metadata to the backup file.
pub async fn dump(&self, path: &str, filename: &str) -> Result<u64> {
let format = FileFormat::FlexBuffers;
let filename = FileName::new(
filename.to_string(),
FileExtension {
format,
data_type: DataType::Metadata,
},
);
let file_path = format!("{}/{}", path.trim_end_matches('/'), filename);
let now = Instant::now();
let req = RangeRequest::new().with_range(vec![0], vec![0]);
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
Ok(FileKeyValue {
key: kv.key,
value: kv.value,
})
})
.into_stream();
let keyvalues = stream.try_collect::<Vec<_>>().await?;
let num_keyvalues = keyvalues.len();
let document = Document::new(
Metadata::new(),
file::Content::Metadata(MetadataContent::new(keyvalues)),
);
let bytes = document.to_bytes(&format)?;
let r = self
.object_store
.write(&file_path, bytes)
.await
.context(WriteObjectSnafu {
file_path: &file_path,
})?;
info!(
"Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
file_path,
num_keyvalues,
r.content_length(),
now.elapsed()
);
Ok(num_keyvalues as u64)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[test]
fn test_file_name() {
let file_name = FileName::try_from("test.metadata.fb").unwrap();
assert_eq!(file_name.name, "test");
assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
assert_eq!(file_name.extension.data_type, DataType::Metadata);
assert_eq!(file_name.to_string(), "test.metadata.fb");
let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
assert_eq!(
invalid_file_name.to_string(),
"Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
);
let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
assert_eq!(
invalid_file_extension.to_string(),
"Invalid file extension: Invalid file format: hello"
);
}
fn test_env(
prefix: &str,
) -> (
TempDir,
Arc<MemoryKvBackend<Error>>,
MetadataSnapshotManager,
) {
let temp_dir = create_temp_dir(prefix);
let kv_backend = Arc::new(MemoryKvBackend::default());
let temp_path = temp_dir.path();
let data_path = temp_path.join("data").as_path().display().to_string();
let builder = Fs::default().root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
(temp_dir, kv_backend, manager)
}
#[tokio::test]
async fn test_dump_and_restore() {
common_telemetry::init_default_ut_logging();
let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
let temp_path = temp_dir.path();
for i in 0..10 {
kv_backend
.put(
PutRequest::new()
.with_key(format!("test_{}", i).as_bytes().to_vec())
.with_value(format!("value_{}", i).as_bytes().to_vec()),
)
.await
.unwrap();
}
let dump_path = temp_path.join("snapshot");
manager
.dump(
&dump_path.as_path().display().to_string(),
"metadata_snapshot",
)
.await
.unwrap();
// Clean up the kv backend
kv_backend.clear();
let restore_path = dump_path
.join("metadata_snapshot.metadata.fb")
.as_path()
.display()
.to_string();
manager.restore(&restore_path).await.unwrap();
for i in 0..10 {
let key = format!("test_{}", i);
let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
assert_eq!(value.value, format!("value_{}", i).as_bytes());
}
}
#[tokio::test]
async fn test_restore_from_nonexistent_file() {
let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
let restore_path = temp_dir
.path()
.join("nonexistent.metadata.fb")
.as_path()
.display()
.to_string();
let err = manager.restore(&restore_path).await.unwrap_err();
assert_matches!(err, Error::ReadObject { .. })
}
}

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::util::current_time_millis;
use flexbuffers::{FlexbufferSerializer, Reader};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{
DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu,
};
use crate::snapshot::FileFormat;
/// The layout of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct Document {
metadata: Metadata,
content: Content,
}
impl Document {
/// Creates a new document.
pub fn new(metadata: Metadata, content: Content) -> Self {
Self { metadata, content }
}
fn serialize_to_flexbuffer(&self) -> Result<Vec<u8>> {
let mut builder = FlexbufferSerializer::new();
self.serialize(&mut builder)
.context(SerializeFlexbuffersSnafu)?;
Ok(builder.take_buffer())
}
/// Converts the [`Document`] to a bytes.
pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result<Vec<u8>> {
match format {
FileFormat::FlexBuffers => self.serialize_to_flexbuffer(),
}
}
fn deserialize_from_flexbuffer(data: &[u8]) -> Result<Self> {
let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?;
Document::deserialize(reader).context(DeserializeFlexbuffersSnafu)
}
/// Deserializes the [`Document`] from a bytes.
pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result<Self> {
match format {
FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data),
}
}
/// Converts the [`Document`] to a [`MetadataContent`].
pub(crate) fn into_metadata_content(self) -> Result<MetadataContent> {
match self.content {
Content::Metadata(metadata) => Ok(metadata),
}
}
}
/// The metadata of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct Metadata {
// UNIX_EPOCH in milliseconds.
created_timestamp_mills: i64,
}
impl Metadata {
/// Create a new metadata.
///
/// The `created_timestamp_mills` will be the current time in milliseconds.
pub fn new() -> Self {
Self {
created_timestamp_mills: current_time_millis(),
}
}
}
/// The content of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) enum Content {
Metadata(MetadataContent),
}
/// The content of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct MetadataContent {
values: Vec<KeyValue>,
}
impl MetadataContent {
/// Create a new metadata content.
pub fn new(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self {
values: values.into_iter().collect(),
}
}
/// Returns an iterator over the key-value pairs.
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter()
}
}
/// The key-value pair of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_document() {
let document = Document::new(
Metadata::new(),
Content::Metadata(MetadataContent::new(vec![KeyValue {
key: b"key".to_vec(),
value: b"value".to_vec(),
}])),
);
let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap();
let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap();
assert_eq!(
document.metadata.created_timestamp_mills,
document_deserialized.metadata.created_timestamp_mills
);
assert_eq!(document.content, document_deserialized.content);
}
}