feat(standalone): support to dump/restore metadata

This commit is contained in:
WenyXu
2025-04-20 08:13:35 +00:00
parent d55d9addf2
commit 07b2ea096b
27 changed files with 244 additions and 11 deletions

View File

@@ -78,6 +78,13 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to build object storage manager"))]
BuildObjectStorageManager {
#[snafu(implicit)]
location: Location,
source: datanode::error::Error,
},
#[snafu(display("Failed to shutdown datanode"))]
ShutdownDatanode {
#[snafu(implicit)]
@@ -328,6 +335,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::BuildObjectStorageManager { source, .. } => source.status_code(),
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }

View File

@@ -44,6 +44,7 @@ use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::snapshot::MetadataSnapshotManager;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
@@ -497,6 +498,10 @@ impl StartCommand {
.build(),
);
let object_store_manager = DatanodeBuilder::build_object_store_manager(&dn_opts.storage)
.await
.context(error::BuildObjectStorageManagerSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
@@ -591,6 +596,11 @@ impl StartCommand {
)
.await?;
let metadata_snapshot_manager = MetadataSnapshotManager::new(
kv_backend.clone(),
object_store_manager.default_object_store().clone(),
);
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
kv_backend.clone(),
@@ -601,6 +611,7 @@ impl StartCommand {
StatementStatistics::new(opts.logging.slow_query.clone()),
)
.with_plugin(plugins.clone())
.with_metadata_snapshot_manager(metadata_snapshot_manager)
.try_build()
.await
.context(error::StartFrontendSnafu)?;

View File

@@ -15,6 +15,7 @@
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod metadata_snaphost;
mod migrate_region;
mod remove_region_follower;
@@ -23,6 +24,7 @@ use std::sync::Arc;
use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use metadata_snaphost::{DumpMetadataFunction, RestoreMetadataFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
@@ -43,5 +45,7 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(DumpMetadataFunction));
registry.register_async(Arc::new(RestoreMetadataFunction));
}
}

View File

@@ -0,0 +1,56 @@
use common_macro::admin_fn;
use common_query::error::{MissingMetadataSnapshotHandlerSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::*;
use session::context::QueryContextRef;
use crate::handlers::MetadataSnapshotHandlerRef;
const METADATA_DIR: &str = "/snaphost/";
const METADATA_FILE_NAME: &str = "dump_metadata";
const METADATA_FILE_EXTENSION: &str = "metadata.fb";
#[admin_fn(
name = DumpMetadataFunction,
display_name = dump_metadata,
sig_fn = dump_signature,
ret = string
)]
pub(crate) async fn dump_metadata(
metadata_snapshot_handler: &MetadataSnapshotHandlerRef,
_query_ctx: &QueryContextRef,
_params: &[ValueRef<'_>],
) -> Result<Value> {
let filename = metadata_snapshot_handler
.dump(METADATA_DIR, METADATA_FILE_NAME)
.await?;
Ok(Value::from(filename))
}
fn dump_signature() -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
}
#[admin_fn(
name = RestoreMetadataFunction,
display_name = restore_metadata,
sig_fn = restore_signature,
ret = uint64,
)]
pub(crate) async fn restore_metadata(
metadata_snapshot_handler: &MetadataSnapshotHandlerRef,
_query_ctx: &QueryContextRef,
_params: &[ValueRef<'_>],
) -> Result<Value> {
let num_keyvalues = metadata_snapshot_handler
.restore(
METADATA_DIR,
&format!("{METADATA_FILE_NAME}.{METADATA_FILE_EXTENSION}"),
)
.await?;
Ok(Value::from(num_keyvalues))
}
fn restore_signature() -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
}

View File

@@ -89,8 +89,18 @@ pub trait FlowServiceHandler: Send + Sync {
) -> Result<api::v1::flow::FlowResponse>;
}
/// This metadata snapshot handler is only use for dump and restore metadata for now.
#[async_trait]
pub trait MetadataSnapshotHandler: Send + Sync {
async fn dump(&self, path: &str, filename: &str) -> Result<String>;
async fn restore(&self, path: &str, filename: &str) -> Result<u64>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
pub type FlowServiceHandlerRef = Arc<dyn FlowServiceHandler>;
pub type MetadataSnapshotHandlerRef = Arc<dyn MetadataSnapshotHandler>;

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
TableMutationHandlerRef,
};
/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
@@ -24,6 +27,8 @@ pub struct FunctionState {
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
// The flownode handler
pub flow_service_handler: Option<FlowServiceHandlerRef>,
// The metadata snapshot handler
pub metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
}
impl FunctionState {
@@ -48,10 +53,14 @@ impl FunctionState {
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
use crate::handlers::{
FlowServiceHandler, MetadataSnapshotHandler, ProcedureServiceHandler,
TableMutationHandler,
};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
struct MockFlowServiceHandler;
struct MockMetadataServiceHandler;
const ROWS: usize = 42;
#[async_trait]
@@ -150,10 +159,22 @@ impl FunctionState {
}
}
#[async_trait]
impl MetadataSnapshotHandler for MockMetadataServiceHandler {
async fn dump(&self, _path: &str, _filename: &str) -> Result<String> {
Ok("test_filename".to_string())
}
async fn restore(&self, _path: &str, _filename: &str) -> Result<u64> {
Ok(100)
}
}
Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
metadata_snapshot_handler: Some(Arc::new(MockMetadataServiceHandler)),
}
}
}

View File

@@ -179,6 +179,10 @@ fn build_struct(
Ident::new("flow_service_handler", handler_type.span()),
Ident::new("MissingFlowServiceHandlerSnafu", handler_type.span()),
),
"MetadataSnapshotHandlerRef" => (
Ident::new("metadata_snapshot_handler", handler_type.span()),
Ident::new("MissingMetadataSnapshotHandlerSnafu", handler_type.span()),
),
handler => ok!(error!(
handler_type.span(),
format!("Unknown handler type: {handler}")

View File

@@ -36,7 +36,7 @@ use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
/// The format of the backup file.
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
pub(crate) enum FileFormat {
pub enum FileFormat {
#[strum(serialize = "fb")]
FlexBuffers,
}
@@ -54,7 +54,7 @@ impl TryFrom<&str> for FileFormat {
#[derive(Debug, PartialEq, Eq, Display)]
#[strum(serialize_all = "lowercase")]
pub(crate) enum DataType {
pub enum DataType {
Metadata,
}
@@ -70,11 +70,17 @@ impl TryFrom<&str> for DataType {
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct FileExtension {
pub struct FileExtension {
format: FileFormat,
data_type: DataType,
}
impl FileExtension {
pub fn new(format: FileFormat, data_type: DataType) -> Self {
Self { format, data_type }
}
}
impl Display for FileExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.data_type, self.format)
@@ -105,7 +111,7 @@ impl TryFrom<&str> for FileExtension {
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct FileName {
pub struct FileName {
name: String,
extension: FileExtension,
}
@@ -207,7 +213,7 @@ impl MetadataSnapshotManager {
}
/// Dumps the metadata to the backup file.
pub async fn dump(&self, path: &str, filename: &str) -> Result<u64> {
pub async fn dump(&self, path: &str, filename: &str) -> Result<(String, u64)> {
let format = FileFormat::FlexBuffers;
let filename = FileName::new(
filename.to_string(),
@@ -248,7 +254,7 @@ impl MetadataSnapshotManager {
now.elapsed()
);
Ok(num_keyvalues as u64)
Ok((filename.to_string(), num_keyvalues as u64))
}
}

View File

@@ -162,6 +162,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to do metadata snapshot"))]
MetadataSnapshot {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to do procedure task"))]
ProcedureService {
source: BoxedError,
@@ -187,6 +194,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Missing MetadataSnapshotHandler, not expected"))]
MissingMetadataSnapshotHandler {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid function args: {}", err_msg))]
InvalidFuncArgs {
err_msg: String,
@@ -251,6 +264,7 @@ impl ErrorExt for Error {
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }
| Error::MissingFlowServiceHandler { .. }
| Error::MissingMetadataSnapshotHandler { .. }
| Error::RegisterUdf { .. } => StatusCode::Unexpected,
Error::UnsupportedInputDataType { .. }
@@ -262,7 +276,8 @@ impl ErrorExt for Error {
Error::DecodePlan { source, .. }
| Error::Execute { source, .. }
| Error::ProcedureService { source, .. }
| Error::TableMutation { source, .. } => source.status_code(),
| Error::TableMutation { source, .. }
| Error::MetadataSnapshot { source, .. } => source.status_code(),
Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
}

View File

@@ -357,6 +357,7 @@ impl DatanodeBuilder {
None,
None,
None,
None,
false,
self.plugins.clone(),
opts.query.clone(),

View File

@@ -334,6 +334,7 @@ impl FlownodeBuilder {
None,
None,
None,
None,
false,
Default::default(),
self.opts.query.clone(),

View File

@@ -153,6 +153,7 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
None,
None,
None,
None,
false,
QueryOptions::default(),
);

View File

@@ -270,6 +270,7 @@ mod test {
None,
None,
None,
None,
false,
QueryOptions::default(),
);

View File

@@ -24,9 +24,11 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::snapshot::MetadataSnapshotManager;
use operator::delete::Deleter;
use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::metadata::MetadataSnapshotOperator;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::{StatementExecutor, StatementExecutorRef};
@@ -55,6 +57,7 @@ pub struct FrontendBuilder {
plugins: Option<Plugins>,
procedure_executor: ProcedureExecutorRef,
stats: StatementStatistics,
metadata_snapshot_manager: Option<MetadataSnapshotManager>,
}
impl FrontendBuilder {
@@ -77,6 +80,17 @@ impl FrontendBuilder {
plugins: None,
procedure_executor,
stats,
metadata_snapshot_manager: None,
}
}
pub fn with_metadata_snapshot_manager(
self,
metadata_snapshot_manager: MetadataSnapshotManager,
) -> Self {
Self {
metadata_snapshot_manager: Some(metadata_snapshot_manager),
..self
}
}
@@ -158,12 +172,17 @@ impl FrontendBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
let metadata_snapshot_operator = self
.metadata_snapshot_manager
.map(|manager| Arc::new(MetadataSnapshotOperator::new(manager)) as _);
let query_engine = QueryEngineFactory::new_with_plugins(
self.catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
Some(procedure_service_handler),
Some(Arc::new(flow_service)),
metadata_snapshot_operator,
true,
plugins.clone(),
self.options.query.clone(),

View File

@@ -20,6 +20,7 @@ pub mod error;
pub mod expr_helper;
pub mod flow;
pub mod insert;
pub mod metadata;
pub mod metrics;
pub mod procedure;
pub mod region_req_factory;

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::MetadataSnapshotHandler;
use common_meta::snapshot::MetadataSnapshotManager;
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use snafu::ResultExt;
/// The operator of the metadata snapshot.
pub struct MetadataSnapshotOperator {
operator: MetadataSnapshotManager,
}
impl MetadataSnapshotOperator {
pub fn new(operator: MetadataSnapshotManager) -> Self {
Self { operator }
}
}
#[async_trait]
impl MetadataSnapshotHandler for MetadataSnapshotOperator {
async fn dump(&self, path: &str, filename: &str) -> QueryResult<String> {
self.operator
.dump(path, filename)
.await
.map_err(BoxedError::new)
.map(|(file, _)| file)
.context(query_error::MetadataSnapshotSnafu)
}
async fn restore(&self, path: &str, filename: &str) -> QueryResult<u64> {
let filepath = format!("{}{}", path, filename);
self.operator
.restore(&filepath)
.await
.map_err(BoxedError::new)
.context(query_error::MetadataSnapshotSnafu)
}
}

View File

@@ -588,6 +588,7 @@ mod tests {
None,
None,
None,
None,
false,
QueryOptions::default(),
)

View File

@@ -295,6 +295,7 @@ mod tests {
None,
None,
None,
None,
false,
Default::default(),
)

View File

@@ -25,7 +25,8 @@ use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::Output;
@@ -100,12 +101,14 @@ pub struct QueryEngineFactory {
}
impl QueryEngineFactory {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_manager: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
with_dist_planner: bool,
options: QueryOptions,
) -> Self {
@@ -115,6 +118,7 @@ impl QueryEngineFactory {
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
metadata_snapshot_handler,
with_dist_planner,
Default::default(),
options,
@@ -128,6 +132,7 @@ impl QueryEngineFactory {
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
options: QueryOptions,
@@ -138,6 +143,7 @@ impl QueryEngineFactory {
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
metadata_snapshot_handler,
with_dist_planner,
plugins.clone(),
options,
@@ -178,6 +184,7 @@ mod tests {
None,
None,
None,
None,
false,
QueryOptions::default(),
);

View File

@@ -84,6 +84,7 @@ impl QueryEngineContext {
None,
None,
None,
None,
false,
Plugins::default(),
QueryOptions::default(),

View File

@@ -184,6 +184,7 @@ mod tests {
None,
None,
None,
None,
false,
QueryOptions::default(),
);

View File

@@ -21,7 +21,8 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef,
TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState;
@@ -91,6 +92,7 @@ impl QueryEngineState {
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
flow_service_handler: Option<FlowServiceHandlerRef>,
metadata_snapshot_handler: Option<MetadataSnapshotHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
options: QueryOptionsNew,
@@ -181,6 +183,7 @@ impl QueryEngineState {
table_mutation_handler,
procedure_service_handler,
flow_service_handler,
metadata_snapshot_handler,
}),
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
extension_rules,

View File

@@ -670,6 +670,7 @@ mod test {
None,
None,
None,
None,
false,
QueryOptions::default(),
)

View File

@@ -53,6 +53,7 @@ pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
None,
None,
None,
None,
false,
QueryOptions::default(),
)

View File

@@ -50,6 +50,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
None,
None,
None,
None,
false,
QueryOptionsNew::default(),
);
@@ -137,6 +138,7 @@ async fn test_query_validate() -> Result<()> {
None,
None,
None,
None,
false,
plugins,
QueryOptionsNew::default(),

View File

@@ -109,6 +109,7 @@ fn create_test_engine() -> TimeRangeTester {
None,
None,
None,
None,
false,
QueryOptions::default(),
)

View File

@@ -177,6 +177,7 @@ fn create_testing_instance(table: TableRef) -> DummyInstance {
None,
None,
None,
None,
false,
QueryOptions::default(),
)