mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
2 Commits
create-tab
...
feat/metad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07b2ea096b | ||
|
|
d55d9addf2 |
37
Cargo.lock
generated
37
Cargo.lock
generated
@@ -2177,6 +2177,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"datafusion-common",
|
||||
@@ -2186,6 +2187,7 @@ dependencies = [
|
||||
"deadpool-postgres",
|
||||
"derive_builder 0.20.1",
|
||||
"etcd-client",
|
||||
"flexbuffers",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hex",
|
||||
@@ -2194,6 +2196,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"lazy_static",
|
||||
"moka",
|
||||
"object-store",
|
||||
"prometheus",
|
||||
"prost 0.13.5",
|
||||
"rand 0.9.0",
|
||||
@@ -4135,6 +4138,19 @@ dependencies = [
|
||||
"miniz_oxide",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flexbuffers"
|
||||
version = "25.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"byteorder",
|
||||
"num_enum",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "float-cmp"
|
||||
version = "0.10.0"
|
||||
@@ -7546,6 +7562,27 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
|
||||
dependencies = [
|
||||
"num_enum_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum_derive"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
|
||||
dependencies = [
|
||||
"proc-macro-crate 1.3.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_threads"
|
||||
version = "0.1.7"
|
||||
|
||||
@@ -78,6 +78,13 @@ pub enum Error {
|
||||
source: datanode::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build object storage manager"))]
|
||||
BuildObjectStorageManager {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datanode::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to shutdown datanode"))]
|
||||
ShutdownDatanode {
|
||||
#[snafu(implicit)]
|
||||
@@ -328,6 +335,8 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::BuildObjectStorageManager { source, .. } => source.status_code(),
|
||||
|
||||
Error::MissingConfig { .. }
|
||||
| Error::LoadLayeredConfig { .. }
|
||||
| Error::IllegalConfig { .. }
|
||||
|
||||
@@ -44,6 +44,7 @@ use common_meta::peer::Peer;
|
||||
use common_meta::region_keeper::MemoryRegionKeeper;
|
||||
use common_meta::region_registry::LeaderRegionRegistry;
|
||||
use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_telemetry::info;
|
||||
@@ -497,6 +498,10 @@ impl StartCommand {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let object_store_manager = DatanodeBuilder::build_object_store_manager(&dn_opts.storage)
|
||||
.await
|
||||
.context(error::BuildObjectStorageManagerSnafu)?;
|
||||
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
|
||||
.with_kv_backend(kv_backend.clone())
|
||||
.with_cache_registry(layered_cache_registry.clone())
|
||||
@@ -591,6 +596,11 @@ impl StartCommand {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let metadata_snapshot_manager = MetadataSnapshotManager::new(
|
||||
kv_backend.clone(),
|
||||
object_store_manager.default_object_store().clone(),
|
||||
);
|
||||
|
||||
let fe_instance = FrontendBuilder::new(
|
||||
fe_opts.clone(),
|
||||
kv_backend.clone(),
|
||||
@@ -601,6 +611,7 @@ impl StartCommand {
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_metadata_snapshot_manager(metadata_snapshot_manager)
|
||||
.try_build()
|
||||
.await
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
mod add_region_follower;
|
||||
mod flush_compact_region;
|
||||
mod flush_compact_table;
|
||||
mod metadata_snaphost;
|
||||
mod migrate_region;
|
||||
mod remove_region_follower;
|
||||
|
||||
@@ -23,6 +24,7 @@ use std::sync::Arc;
|
||||
use add_region_follower::AddRegionFollowerFunction;
|
||||
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
|
||||
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use metadata_snaphost::{DumpMetadataFunction, RestoreMetadataFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||
|
||||
@@ -43,5 +45,7 @@ impl AdminFunction {
|
||||
registry.register_async(Arc::new(FlushTableFunction));
|
||||
registry.register_async(Arc::new(CompactTableFunction));
|
||||
registry.register_async(Arc::new(FlushFlowFunction));
|
||||
registry.register_async(Arc::new(DumpMetadataFunction));
|
||||
registry.register_async(Arc::new(RestoreMetadataFunction));
|
||||
}
|
||||
}
|
||||
|
||||
56
src/common/function/src/admin/metadata_snaphost.rs
Normal file
56
src/common/function/src/admin/metadata_snaphost.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use common_macro::admin_fn;
|
||||
use common_query::error::{MissingMetadataSnapshotHandlerSnafu, Result};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use datatypes::prelude::*;
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use crate::handlers::MetadataSnapshotHandlerRef;
|
||||
|
||||
const METADATA_DIR: &str = "/snaphost/";
|
||||
const METADATA_FILE_NAME: &str = "dump_metadata";
|
||||
const METADATA_FILE_EXTENSION: &str = "metadata.fb";
|
||||
|
||||
#[admin_fn(
|
||||
name = DumpMetadataFunction,
|
||||
display_name = dump_metadata,
|
||||
sig_fn = dump_signature,
|
||||
ret = string
|
||||
)]
|
||||
pub(crate) async fn dump_metadata(
|
||||
metadata_snapshot_handler: &MetadataSnapshotHandlerRef,
|
||||
_query_ctx: &QueryContextRef,
|
||||
_params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let filename = metadata_snapshot_handler
|
||||
.dump(METADATA_DIR, METADATA_FILE_NAME)
|
||||
.await?;
|
||||
Ok(Value::from(filename))
|
||||
}
|
||||
|
||||
fn dump_signature() -> Signature {
|
||||
Signature::uniform(0, vec![], Volatility::Immutable)
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = RestoreMetadataFunction,
|
||||
display_name = restore_metadata,
|
||||
sig_fn = restore_signature,
|
||||
ret = uint64,
|
||||
)]
|
||||
pub(crate) async fn restore_metadata(
|
||||
metadata_snapshot_handler: &MetadataSnapshotHandlerRef,
|
||||
_query_ctx: &QueryContextRef,
|
||||
_params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let num_keyvalues = metadata_snapshot_handler
|
||||
.restore(
|
||||
METADATA_DIR,
|
||||
&format!("{METADATA_FILE_NAME}.{METADATA_FILE_EXTENSION}"),
|
||||
)
|
||||
.await?;
|
||||
Ok(Value::from(num_keyvalues))
|
||||
}
|
||||
|
||||
fn restore_signature() -> Signature {
|
||||
Signature::uniform(0, vec![], Volatility::Immutable)
|
||||
}
|
||||
@@ -89,8 +89,18 @@ pub trait FlowServiceHandler: Send + Sync {
|
||||
) -> Result<api::v1::flow::FlowResponse>;
|
||||
}
|
||||
|
||||
/// This metadata snapshot handler is only use for dump and restore metadata for now.
|
||||
#[async_trait]
|
||||
pub trait MetadataSnapshotHandler: Send + Sync {
|
||||
async fn dump(&self, path: &str, filename: &str) -> Result<String>;
|
||||
|
||||
async fn restore(&self, path: &str, filename: &str) -> Result<u64>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
|
||||
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
|
||||
|
||||
pub type FlowServiceHandlerRef = Arc<dyn FlowServiceHandler>;
|
||||
|
||||
pub type MetadataSnapshotHandlerRef = Arc<dyn MetadataSnapshotHandler>;
|
||||
|
||||
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
|
||||
use crate::handlers::{
|
||||
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
|
||||
TableMutationHandlerRef,
|
||||
};
|
||||
|
||||
/// Shared state for SQL functions.
|
||||
/// The handlers in state may be `None` in cli command-line or test cases.
|
||||
@@ -24,6 +27,8 @@ pub struct FunctionState {
|
||||
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
// The flownode handler
|
||||
pub flow_service_handler: Option<FlowServiceHandlerRef>,
|
||||
// The metadata snapshot handler
|
||||
pub metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
|
||||
}
|
||||
|
||||
impl FunctionState {
|
||||
@@ -48,10 +53,14 @@ impl FunctionState {
|
||||
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
|
||||
};
|
||||
|
||||
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
|
||||
use crate::handlers::{
|
||||
FlowServiceHandler, MetadataSnapshotHandler, ProcedureServiceHandler,
|
||||
TableMutationHandler,
|
||||
};
|
||||
struct MockProcedureServiceHandler;
|
||||
struct MockTableMutationHandler;
|
||||
struct MockFlowServiceHandler;
|
||||
struct MockMetadataServiceHandler;
|
||||
const ROWS: usize = 42;
|
||||
|
||||
#[async_trait]
|
||||
@@ -150,10 +159,22 @@ impl FunctionState {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MetadataSnapshotHandler for MockMetadataServiceHandler {
|
||||
async fn dump(&self, _path: &str, _filename: &str) -> Result<String> {
|
||||
Ok("test_filename".to_string())
|
||||
}
|
||||
|
||||
async fn restore(&self, _path: &str, _filename: &str) -> Result<u64> {
|
||||
Ok(100)
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
|
||||
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
|
||||
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
|
||||
metadata_snapshot_handler: Some(Arc::new(MockMetadataServiceHandler)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,6 +179,10 @@ fn build_struct(
|
||||
Ident::new("flow_service_handler", handler_type.span()),
|
||||
Ident::new("MissingFlowServiceHandlerSnafu", handler_type.span()),
|
||||
),
|
||||
"MetadataSnapshotHandlerRef" => (
|
||||
Ident::new("metadata_snapshot_handler", handler_type.span()),
|
||||
Ident::new("MissingMetadataSnapshotHandlerSnafu", handler_type.span()),
|
||||
),
|
||||
handler => ok!(error!(
|
||||
handler_type.span(),
|
||||
format!("Unknown handler type: {handler}")
|
||||
|
||||
@@ -41,6 +41,7 @@ deadpool = { workspace = true, optional = true }
|
||||
deadpool-postgres = { workspace = true, optional = true }
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
flexbuffers = "25.2"
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hex.workspace = true
|
||||
@@ -48,6 +49,7 @@ humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
moka.workspace = true
|
||||
object-store.workspace = true
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -70,6 +72,7 @@ typetag.workspace = true
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
common-procedure = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
common-wal = { workspace = true, features = ["testing"] }
|
||||
datatypes.workspace = true
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
|
||||
@@ -783,6 +783,76 @@ pub enum Error {
|
||||
#[snafu(source)]
|
||||
source: common_procedure::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file path: {}", file_path))]
|
||||
InvalidFilePath {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize flexbuffers"))]
|
||||
SerializeFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::SerializationError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize flexbuffers"))]
|
||||
DeserializeFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::DeserializationError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read flexbuffers"))]
|
||||
ReadFlexbuffers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: flexbuffers::ReaderError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file name: {}", reason))]
|
||||
InvalidFileName {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file extension: {}", reason))]
|
||||
InvalidFileExtension {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid file context: {}", reason))]
|
||||
InvalidFileContext {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write object, file path: {}", file_path))]
|
||||
WriteObject {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object, file path: {}", file_path))]
|
||||
ReadObject {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
file_path: String,
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -801,6 +871,8 @@ impl ErrorExt for Error {
|
||||
| SerializeToJson { .. }
|
||||
| DeserializeFromJson { .. } => StatusCode::Internal,
|
||||
|
||||
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
NoLeader { .. } => StatusCode::TableUnavailable,
|
||||
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
|
||||
|
||||
@@ -837,7 +909,11 @@ impl ErrorExt for Error {
|
||||
| ProcedureOutput { .. }
|
||||
| FromUtf8 { .. }
|
||||
| MetadataCorruption { .. }
|
||||
| ParseWalOptions { .. } => StatusCode::Unexpected,
|
||||
| ParseWalOptions { .. }
|
||||
| ReadFlexbuffers { .. }
|
||||
| SerializeFlexbuffers { .. }
|
||||
| DeserializeFlexbuffers { .. }
|
||||
| InvalidFileContext { .. } => StatusCode::Unexpected,
|
||||
|
||||
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
|
||||
|
||||
@@ -853,7 +929,10 @@ impl ErrorExt for Error {
|
||||
| TlsConfig { .. }
|
||||
| InvalidSetDatabaseOption { .. }
|
||||
| InvalidUnsetDatabaseOption { .. }
|
||||
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidTopicNamePrefix { .. }
|
||||
| InvalidFileExtension { .. }
|
||||
| InvalidFileName { .. }
|
||||
| InvalidFilePath { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
FlowNotFound { .. } => StatusCode::FlowNotFound,
|
||||
FlowRouteNotFound { .. } => StatusCode::Unexpected,
|
||||
|
||||
@@ -43,6 +43,7 @@ pub mod region_keeper;
|
||||
pub mod region_registry;
|
||||
pub mod rpc;
|
||||
pub mod sequence;
|
||||
pub mod snapshot;
|
||||
pub mod state_store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
|
||||
365
src/common/meta/src/snapshot.rs
Normal file
365
src/common/meta/src/snapshot.rs
Normal file
@@ -0,0 +1,365 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod file;
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::info;
|
||||
use file::{Metadata, MetadataContent};
|
||||
use futures::TryStreamExt;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use strum::Display;
|
||||
|
||||
use crate::error::{
|
||||
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
|
||||
Result, WriteObjectSnafu,
|
||||
};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::{BatchPutRequest, RangeRequest};
|
||||
use crate::rpc::KeyValue;
|
||||
use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
|
||||
|
||||
/// The format of the backup file.
|
||||
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
|
||||
pub enum FileFormat {
|
||||
#[strum(serialize = "fb")]
|
||||
FlexBuffers,
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileFormat {
|
||||
type Error = String;
|
||||
|
||||
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
|
||||
match value.to_lowercase().as_str() {
|
||||
"fb" => Ok(FileFormat::FlexBuffers),
|
||||
_ => Err(format!("Invalid file format: {}", value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Display)]
|
||||
#[strum(serialize_all = "lowercase")]
|
||||
pub enum DataType {
|
||||
Metadata,
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for DataType {
|
||||
type Error = String;
|
||||
|
||||
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
|
||||
match value.to_lowercase().as_str() {
|
||||
"metadata" => Ok(DataType::Metadata),
|
||||
_ => Err(format!("Invalid data type: {}", value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct FileExtension {
|
||||
format: FileFormat,
|
||||
data_type: DataType,
|
||||
}
|
||||
|
||||
impl FileExtension {
|
||||
pub fn new(format: FileFormat, data_type: DataType) -> Self {
|
||||
Self { format, data_type }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for FileExtension {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}.{}", self.data_type, self.format)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileExtension {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let parts = value.split(".").collect::<Vec<&str>>();
|
||||
if parts.len() != 2 {
|
||||
return InvalidFileExtensionSnafu {
|
||||
reason: format!(
|
||||
"Extension should be in the format of <datatype>.<format>, got: {}",
|
||||
value
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let data_type = DataType::try_from(parts[0])
|
||||
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
|
||||
let format = FileFormat::try_from(parts[1])
|
||||
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
|
||||
Ok(FileExtension { format, data_type })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct FileName {
|
||||
name: String,
|
||||
extension: FileExtension,
|
||||
}
|
||||
|
||||
impl Display for FileName {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}.{}", self.name, self.extension)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FileName {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
let Some((name, extension)) = value.split_once(".") else {
|
||||
return InvalidFileNameSnafu {
|
||||
reason: format!(
|
||||
"The file name should be in the format of <name>.<extension>, got: {}",
|
||||
value
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let extension = FileExtension::try_from(extension)?;
|
||||
Ok(Self {
|
||||
name: name.to_string(),
|
||||
extension,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FileName {
|
||||
fn new(name: String, extension: FileExtension) -> Self {
|
||||
Self { name, extension }
|
||||
}
|
||||
}
|
||||
|
||||
/// The manager of the metadata snapshot.
|
||||
///
|
||||
/// It manages the metadata snapshot, including dumping and restoring.
|
||||
pub struct MetadataSnapshotManager {
|
||||
kv_backend: KvBackendRef,
|
||||
object_store: ObjectStore,
|
||||
}
|
||||
|
||||
/// The maximum size of the request to put metadata, use 1MiB by default.
|
||||
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
|
||||
|
||||
impl MetadataSnapshotManager {
|
||||
pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
kv_backend,
|
||||
object_store,
|
||||
}
|
||||
}
|
||||
|
||||
/// Restores the metadata from the backup file to the metadata store.
|
||||
pub async fn restore(&self, file_path: &str) -> Result<u64> {
|
||||
let filename = FileName::try_from(
|
||||
file_path
|
||||
.rsplit("/")
|
||||
.next()
|
||||
.context(InvalidFilePathSnafu { file_path })?,
|
||||
)?;
|
||||
let data = self
|
||||
.object_store
|
||||
.read(file_path)
|
||||
.await
|
||||
.context(ReadObjectSnafu { file_path })?;
|
||||
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
|
||||
let metadata_content = document.into_metadata_content()?;
|
||||
let mut req = BatchPutRequest::default();
|
||||
let mut total_request_size = 0;
|
||||
let mut count = 0;
|
||||
let now = Instant::now();
|
||||
for FileKeyValue { key, value } in metadata_content.into_iter() {
|
||||
count += 1;
|
||||
let key_size = key.len();
|
||||
let value_size = value.len();
|
||||
if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
|
||||
self.kv_backend.batch_put(req).await?;
|
||||
req = BatchPutRequest::default();
|
||||
total_request_size = 0;
|
||||
}
|
||||
req.kvs.push(KeyValue { key, value });
|
||||
total_request_size += key_size + value_size;
|
||||
}
|
||||
if !req.kvs.is_empty() {
|
||||
self.kv_backend.batch_put(req).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
|
||||
file_path,
|
||||
count,
|
||||
now.elapsed()
|
||||
);
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Dumps the metadata to the backup file.
|
||||
pub async fn dump(&self, path: &str, filename: &str) -> Result<(String, u64)> {
|
||||
let format = FileFormat::FlexBuffers;
|
||||
let filename = FileName::new(
|
||||
filename.to_string(),
|
||||
FileExtension {
|
||||
format,
|
||||
data_type: DataType::Metadata,
|
||||
},
|
||||
);
|
||||
let file_path = format!("{}/{}", path.trim_end_matches('/'), filename);
|
||||
let now = Instant::now();
|
||||
let req = RangeRequest::new().with_range(vec![0], vec![0]);
|
||||
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
|
||||
Ok(FileKeyValue {
|
||||
key: kv.key,
|
||||
value: kv.value,
|
||||
})
|
||||
})
|
||||
.into_stream();
|
||||
let keyvalues = stream.try_collect::<Vec<_>>().await?;
|
||||
let num_keyvalues = keyvalues.len();
|
||||
let document = Document::new(
|
||||
Metadata::new(),
|
||||
file::Content::Metadata(MetadataContent::new(keyvalues)),
|
||||
);
|
||||
let bytes = document.to_bytes(&format)?;
|
||||
let r = self
|
||||
.object_store
|
||||
.write(&file_path, bytes)
|
||||
.await
|
||||
.context(WriteObjectSnafu {
|
||||
file_path: &file_path,
|
||||
})?;
|
||||
info!(
|
||||
"Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
|
||||
file_path,
|
||||
num_keyvalues,
|
||||
r.content_length(),
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok((filename.to_string(), num_keyvalues as u64))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use object_store::services::Fs;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::KvBackend;
|
||||
use crate::rpc::store::PutRequest;
|
||||
|
||||
#[test]
|
||||
fn test_file_name() {
|
||||
let file_name = FileName::try_from("test.metadata.fb").unwrap();
|
||||
assert_eq!(file_name.name, "test");
|
||||
assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
|
||||
assert_eq!(file_name.extension.data_type, DataType::Metadata);
|
||||
assert_eq!(file_name.to_string(), "test.metadata.fb");
|
||||
|
||||
let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
|
||||
assert_eq!(
|
||||
invalid_file_name.to_string(),
|
||||
"Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
|
||||
);
|
||||
|
||||
let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
|
||||
assert_eq!(
|
||||
invalid_file_extension.to_string(),
|
||||
"Invalid file extension: Invalid file format: hello"
|
||||
);
|
||||
}
|
||||
|
||||
fn test_env(
|
||||
prefix: &str,
|
||||
) -> (
|
||||
TempDir,
|
||||
Arc<MemoryKvBackend<Error>>,
|
||||
MetadataSnapshotManager,
|
||||
) {
|
||||
let temp_dir = create_temp_dir(prefix);
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let temp_path = temp_dir.path();
|
||||
let data_path = temp_path.join("data").as_path().display().to_string();
|
||||
let builder = Fs::default().root(&data_path);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
|
||||
(temp_dir, kv_backend, manager)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dump_and_restore() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
|
||||
let temp_path = temp_dir.path();
|
||||
|
||||
for i in 0..10 {
|
||||
kv_backend
|
||||
.put(
|
||||
PutRequest::new()
|
||||
.with_key(format!("test_{}", i).as_bytes().to_vec())
|
||||
.with_value(format!("value_{}", i).as_bytes().to_vec()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let dump_path = temp_path.join("snapshot");
|
||||
manager
|
||||
.dump(
|
||||
&dump_path.as_path().display().to_string(),
|
||||
"metadata_snapshot",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Clean up the kv backend
|
||||
kv_backend.clear();
|
||||
|
||||
let restore_path = dump_path
|
||||
.join("metadata_snapshot.metadata.fb")
|
||||
.as_path()
|
||||
.display()
|
||||
.to_string();
|
||||
manager.restore(&restore_path).await.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
let key = format!("test_{}", i);
|
||||
let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
|
||||
assert_eq!(value.value, format!("value_{}", i).as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_restore_from_nonexistent_file() {
|
||||
let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
|
||||
let restore_path = temp_dir
|
||||
.path()
|
||||
.join("nonexistent.metadata.fb")
|
||||
.as_path()
|
||||
.display()
|
||||
.to_string();
|
||||
let err = manager.restore(&restore_path).await.unwrap_err();
|
||||
assert_matches!(err, Error::ReadObject { .. })
|
||||
}
|
||||
}
|
||||
145
src/common/meta/src/snapshot/file.rs
Normal file
145
src/common/meta/src/snapshot/file.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::util::current_time_millis;
|
||||
use flexbuffers::{FlexbufferSerializer, Reader};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu,
|
||||
};
|
||||
use crate::snapshot::FileFormat;
|
||||
|
||||
/// The layout of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct Document {
|
||||
metadata: Metadata,
|
||||
content: Content,
|
||||
}
|
||||
|
||||
impl Document {
|
||||
/// Creates a new document.
|
||||
pub fn new(metadata: Metadata, content: Content) -> Self {
|
||||
Self { metadata, content }
|
||||
}
|
||||
|
||||
fn serialize_to_flexbuffer(&self) -> Result<Vec<u8>> {
|
||||
let mut builder = FlexbufferSerializer::new();
|
||||
self.serialize(&mut builder)
|
||||
.context(SerializeFlexbuffersSnafu)?;
|
||||
Ok(builder.take_buffer())
|
||||
}
|
||||
|
||||
/// Converts the [`Document`] to a bytes.
|
||||
pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result<Vec<u8>> {
|
||||
match format {
|
||||
FileFormat::FlexBuffers => self.serialize_to_flexbuffer(),
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_from_flexbuffer(data: &[u8]) -> Result<Self> {
|
||||
let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?;
|
||||
Document::deserialize(reader).context(DeserializeFlexbuffersSnafu)
|
||||
}
|
||||
|
||||
/// Deserializes the [`Document`] from a bytes.
|
||||
pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result<Self> {
|
||||
match format {
|
||||
FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the [`Document`] to a [`MetadataContent`].
|
||||
pub(crate) fn into_metadata_content(self) -> Result<MetadataContent> {
|
||||
match self.content {
|
||||
Content::Metadata(metadata) => Ok(metadata),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The metadata of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct Metadata {
|
||||
// UNIX_EPOCH in milliseconds.
|
||||
created_timestamp_mills: i64,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
/// Create a new metadata.
|
||||
///
|
||||
/// The `created_timestamp_mills` will be the current time in milliseconds.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
created_timestamp_mills: current_time_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The content of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) enum Content {
|
||||
Metadata(MetadataContent),
|
||||
}
|
||||
|
||||
/// The content of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct MetadataContent {
|
||||
values: Vec<KeyValue>,
|
||||
}
|
||||
|
||||
impl MetadataContent {
|
||||
/// Create a new metadata content.
|
||||
pub fn new(values: impl IntoIterator<Item = KeyValue>) -> Self {
|
||||
Self {
|
||||
values: values.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the key-value pairs.
|
||||
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
|
||||
self.values.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// The key-value pair of the backup file.
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub(crate) struct KeyValue {
|
||||
pub key: Vec<u8>,
|
||||
pub value: Vec<u8>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_document() {
|
||||
let document = Document::new(
|
||||
Metadata::new(),
|
||||
Content::Metadata(MetadataContent::new(vec![KeyValue {
|
||||
key: b"key".to_vec(),
|
||||
value: b"value".to_vec(),
|
||||
}])),
|
||||
);
|
||||
|
||||
let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap();
|
||||
let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap();
|
||||
assert_eq!(
|
||||
document.metadata.created_timestamp_mills,
|
||||
document_deserialized.metadata.created_timestamp_mills
|
||||
);
|
||||
assert_eq!(document.content, document_deserialized.content);
|
||||
}
|
||||
}
|
||||
@@ -162,6 +162,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do metadata snapshot"))]
|
||||
MetadataSnapshot {
|
||||
source: BoxedError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do procedure task"))]
|
||||
ProcedureService {
|
||||
source: BoxedError,
|
||||
@@ -187,6 +194,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing MetadataSnapshotHandler, not expected"))]
|
||||
MissingMetadataSnapshotHandler {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid function args: {}", err_msg))]
|
||||
InvalidFuncArgs {
|
||||
err_msg: String,
|
||||
@@ -251,6 +264,7 @@ impl ErrorExt for Error {
|
||||
Error::MissingTableMutationHandler { .. }
|
||||
| Error::MissingProcedureServiceHandler { .. }
|
||||
| Error::MissingFlowServiceHandler { .. }
|
||||
| Error::MissingMetadataSnapshotHandler { .. }
|
||||
| Error::RegisterUdf { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::UnsupportedInputDataType { .. }
|
||||
@@ -262,7 +276,8 @@ impl ErrorExt for Error {
|
||||
Error::DecodePlan { source, .. }
|
||||
| Error::Execute { source, .. }
|
||||
| Error::ProcedureService { source, .. }
|
||||
| Error::TableMutation { source, .. } => source.status_code(),
|
||||
| Error::TableMutation { source, .. }
|
||||
| Error::MetadataSnapshot { source, .. } => source.status_code(),
|
||||
|
||||
Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
|
||||
}
|
||||
|
||||
@@ -357,6 +357,7 @@ impl DatanodeBuilder {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
self.plugins.clone(),
|
||||
opts.query.clone(),
|
||||
|
||||
@@ -334,6 +334,7 @@ impl FlownodeBuilder {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
Default::default(),
|
||||
self.opts.query.clone(),
|
||||
|
||||
@@ -153,6 +153,7 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
);
|
||||
|
||||
@@ -270,6 +270,7 @@ mod test {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
);
|
||||
|
||||
@@ -24,9 +24,11 @@ use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use operator::delete::Deleter;
|
||||
use operator::flow::FlowServiceOperator;
|
||||
use operator::insert::Inserter;
|
||||
use operator::metadata::MetadataSnapshotOperator;
|
||||
use operator::procedure::ProcedureServiceOperator;
|
||||
use operator::request::Requester;
|
||||
use operator::statement::{StatementExecutor, StatementExecutorRef};
|
||||
@@ -55,6 +57,7 @@ pub struct FrontendBuilder {
|
||||
plugins: Option<Plugins>,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
stats: StatementStatistics,
|
||||
metadata_snapshot_manager: Option<MetadataSnapshotManager>,
|
||||
}
|
||||
|
||||
impl FrontendBuilder {
|
||||
@@ -77,6 +80,17 @@ impl FrontendBuilder {
|
||||
plugins: None,
|
||||
procedure_executor,
|
||||
stats,
|
||||
metadata_snapshot_manager: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_metadata_snapshot_manager(
|
||||
self,
|
||||
metadata_snapshot_manager: MetadataSnapshotManager,
|
||||
) -> Self {
|
||||
Self {
|
||||
metadata_snapshot_manager: Some(metadata_snapshot_manager),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,12 +172,17 @@ impl FrontendBuilder {
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
|
||||
let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
|
||||
|
||||
let metadata_snapshot_operator = self
|
||||
.metadata_snapshot_manager
|
||||
.map(|manager| Arc::new(MetadataSnapshotOperator::new(manager)) as _);
|
||||
|
||||
let query_engine = QueryEngineFactory::new_with_plugins(
|
||||
self.catalog_manager.clone(),
|
||||
Some(region_query_handler.clone()),
|
||||
Some(table_mutation_handler),
|
||||
Some(procedure_service_handler),
|
||||
Some(Arc::new(flow_service)),
|
||||
metadata_snapshot_operator,
|
||||
true,
|
||||
plugins.clone(),
|
||||
self.options.query.clone(),
|
||||
|
||||
@@ -20,6 +20,7 @@ pub mod error;
|
||||
pub mod expr_helper;
|
||||
pub mod flow;
|
||||
pub mod insert;
|
||||
pub mod metadata;
|
||||
pub mod metrics;
|
||||
pub mod procedure;
|
||||
pub mod region_req_factory;
|
||||
|
||||
53
src/operator/src/metadata.rs
Normal file
53
src/operator/src/metadata.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::MetadataSnapshotHandler;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use common_query::error as query_error;
|
||||
use common_query::error::Result as QueryResult;
|
||||
use snafu::ResultExt;
|
||||
|
||||
/// The operator of the metadata snapshot.
|
||||
pub struct MetadataSnapshotOperator {
|
||||
operator: MetadataSnapshotManager,
|
||||
}
|
||||
|
||||
impl MetadataSnapshotOperator {
|
||||
pub fn new(operator: MetadataSnapshotManager) -> Self {
|
||||
Self { operator }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MetadataSnapshotHandler for MetadataSnapshotOperator {
|
||||
async fn dump(&self, path: &str, filename: &str) -> QueryResult<String> {
|
||||
self.operator
|
||||
.dump(path, filename)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.map(|(file, _)| file)
|
||||
.context(query_error::MetadataSnapshotSnafu)
|
||||
}
|
||||
|
||||
async fn restore(&self, path: &str, filename: &str) -> QueryResult<u64> {
|
||||
let filepath = format!("{}{}", path, filename);
|
||||
self.operator
|
||||
.restore(&filepath)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(query_error::MetadataSnapshotSnafu)
|
||||
}
|
||||
}
|
||||
@@ -588,6 +588,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
)
|
||||
|
||||
@@ -295,6 +295,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
Default::default(),
|
||||
)
|
||||
|
||||
@@ -25,7 +25,8 @@ use common_base::Plugins;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_function::handlers::{
|
||||
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
|
||||
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
|
||||
TableMutationHandlerRef,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_query::Output;
|
||||
@@ -100,12 +101,14 @@ pub struct QueryEngineFactory {
|
||||
}
|
||||
|
||||
impl QueryEngineFactory {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
region_query_handler: Option<RegionQueryHandlerRef>,
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
flow_service_handler: Option<FlowServiceHandlerRef>,
|
||||
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
options: QueryOptions,
|
||||
) -> Self {
|
||||
@@ -115,6 +118,7 @@ impl QueryEngineFactory {
|
||||
table_mutation_handler,
|
||||
procedure_service_handler,
|
||||
flow_service_handler,
|
||||
metadata_snapshot_handler,
|
||||
with_dist_planner,
|
||||
Default::default(),
|
||||
options,
|
||||
@@ -128,6 +132,7 @@ impl QueryEngineFactory {
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
flow_service_handler: Option<FlowServiceHandlerRef>,
|
||||
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
plugins: Plugins,
|
||||
options: QueryOptions,
|
||||
@@ -138,6 +143,7 @@ impl QueryEngineFactory {
|
||||
table_mutation_handler,
|
||||
procedure_service_handler,
|
||||
flow_service_handler,
|
||||
metadata_snapshot_handler,
|
||||
with_dist_planner,
|
||||
plugins.clone(),
|
||||
options,
|
||||
@@ -178,6 +184,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
);
|
||||
|
||||
@@ -84,6 +84,7 @@ impl QueryEngineContext {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
Plugins::default(),
|
||||
QueryOptions::default(),
|
||||
|
||||
@@ -184,6 +184,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
);
|
||||
|
||||
@@ -21,7 +21,8 @@ use catalog::CatalogManagerRef;
|
||||
use common_base::Plugins;
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::handlers::{
|
||||
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
|
||||
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
|
||||
TableMutationHandlerRef,
|
||||
};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_function::state::FunctionState;
|
||||
@@ -91,6 +92,7 @@ impl QueryEngineState {
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
|
||||
flow_service_handler: Option<FlowServiceHandlerRef>,
|
||||
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
plugins: Plugins,
|
||||
options: QueryOptionsNew,
|
||||
@@ -181,6 +183,7 @@ impl QueryEngineState {
|
||||
table_mutation_handler,
|
||||
procedure_service_handler,
|
||||
flow_service_handler,
|
||||
metadata_snapshot_handler,
|
||||
}),
|
||||
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
extension_rules,
|
||||
|
||||
@@ -670,6 +670,7 @@ mod test {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
)
|
||||
|
||||
@@ -53,6 +53,7 @@ pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
)
|
||||
|
||||
@@ -50,6 +50,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptionsNew::default(),
|
||||
);
|
||||
@@ -137,6 +138,7 @@ async fn test_query_validate() -> Result<()> {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
plugins,
|
||||
QueryOptionsNew::default(),
|
||||
|
||||
@@ -109,6 +109,7 @@ fn create_test_engine() -> TimeRangeTester {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
)
|
||||
|
||||
@@ -177,6 +177,7 @@ fn create_testing_instance(table: TableRef) -> DummyInstance {
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
QueryOptions::default(),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user