From 07b2ea096bb21e0743bfaca3024acf1ea2062959 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 20 Apr 2025 08:13:35 +0000 Subject: [PATCH] feat(standalone): support to dump/restore metadata --- src/cmd/src/error.rs | 9 +++ src/cmd/src/standalone.rs | 11 ++++ src/common/function/src/admin.rs | 4 ++ .../function/src/admin/metadata_snaphost.rs | 56 +++++++++++++++++++ src/common/function/src/handlers.rs | 10 ++++ src/common/function/src/state.rs | 25 ++++++++- src/common/macro/src/admin_fn.rs | 4 ++ src/common/meta/src/snapshot.rs | 18 ++++-- src/common/query/src/error.rs | 17 +++++- src/datanode/src/datanode.rs | 1 + src/flow/src/server.rs | 1 + src/flow/src/test_utils.rs | 1 + src/flow/src/transform.rs | 1 + src/frontend/src/instance/builder.rs | 19 +++++++ src/operator/src/lib.rs | 1 + src/operator/src/metadata.rs | 53 ++++++++++++++++++ src/query/src/datafusion.rs | 1 + src/query/src/optimizer/constant_term.rs | 1 + src/query/src/query_engine.rs | 9 ++- src/query/src/query_engine/context.rs | 1 + .../src/query_engine/default_serializer.rs | 1 + src/query/src/query_engine/state.rs | 5 +- src/query/src/range_select/plan_rewrite.rs | 1 + src/query/src/tests.rs | 1 + src/query/src/tests/query_engine_test.rs | 2 + src/query/src/tests/time_range_filter_test.rs | 1 + src/servers/tests/mod.rs | 1 + 27 files changed, 244 insertions(+), 11 deletions(-) create mode 100644 src/common/function/src/admin/metadata_snaphost.rs create mode 100644 src/operator/src/metadata.rs diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index a671290503..af25b4ade5 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -78,6 +78,13 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("Failed to build object storage manager"))] + BuildObjectStorageManager { + #[snafu(implicit)] + location: Location, + source: datanode::error::Error, + }, + #[snafu(display("Failed to shutdown datanode"))] ShutdownDatanode { #[snafu(implicit)] @@ -328,6 +335,8 @@ impl ErrorExt for Error { source.status_code() } + Error::BuildObjectStorageManager { source, .. } => source.status_code(), + Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 320a2849ed..2964d83e6b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,6 +44,7 @@ use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; +use common_meta::snapshot::MetadataSnapshotManager; use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef}; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_telemetry::info; @@ -497,6 +498,10 @@ impl StartCommand { .build(), ); + let object_store_manager = DatanodeBuilder::build_object_store_manager(&dn_opts.storage) + .await + .context(error::BuildObjectStorageManagerSnafu)?; + let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone) .with_kv_backend(kv_backend.clone()) .with_cache_registry(layered_cache_registry.clone()) @@ -591,6 +596,11 @@ impl StartCommand { ) .await?; + let metadata_snapshot_manager = MetadataSnapshotManager::new( + kv_backend.clone(), + object_store_manager.default_object_store().clone(), + ); + let fe_instance = FrontendBuilder::new( fe_opts.clone(), kv_backend.clone(), @@ -601,6 +611,7 @@ impl StartCommand { StatementStatistics::new(opts.logging.slow_query.clone()), ) .with_plugin(plugins.clone()) + .with_metadata_snapshot_manager(metadata_snapshot_manager) .try_build() .await .context(error::StartFrontendSnafu)?; diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index c06b28e7d5..b45ab2642e 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -15,6 +15,7 @@ mod add_region_follower; mod flush_compact_region; mod flush_compact_table; +mod metadata_snaphost; mod migrate_region; mod remove_region_follower; @@ -23,6 +24,7 @@ use std::sync::Arc; use add_region_follower::AddRegionFollowerFunction; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; +use metadata_snaphost::{DumpMetadataFunction, RestoreMetadataFunction}; use migrate_region::MigrateRegionFunction; use remove_region_follower::RemoveRegionFollowerFunction; @@ -43,5 +45,7 @@ impl AdminFunction { registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(FlushFlowFunction)); + registry.register_async(Arc::new(DumpMetadataFunction)); + registry.register_async(Arc::new(RestoreMetadataFunction)); } } diff --git a/src/common/function/src/admin/metadata_snaphost.rs b/src/common/function/src/admin/metadata_snaphost.rs new file mode 100644 index 0000000000..c8626e86be --- /dev/null +++ b/src/common/function/src/admin/metadata_snaphost.rs @@ -0,0 +1,56 @@ +use common_macro::admin_fn; +use common_query::error::{MissingMetadataSnapshotHandlerSnafu, Result}; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::*; +use session::context::QueryContextRef; + +use crate::handlers::MetadataSnapshotHandlerRef; + +const METADATA_DIR: &str = "/snaphost/"; +const METADATA_FILE_NAME: &str = "dump_metadata"; +const METADATA_FILE_EXTENSION: &str = "metadata.fb"; + +#[admin_fn( + name = DumpMetadataFunction, + display_name = dump_metadata, + sig_fn = dump_signature, + ret = string +)] +pub(crate) async fn dump_metadata( + metadata_snapshot_handler: &MetadataSnapshotHandlerRef, + _query_ctx: &QueryContextRef, + _params: &[ValueRef<'_>], +) -> Result { + let filename = metadata_snapshot_handler + .dump(METADATA_DIR, METADATA_FILE_NAME) + .await?; + Ok(Value::from(filename)) +} + +fn dump_signature() -> Signature { + Signature::uniform(0, vec![], Volatility::Immutable) +} + +#[admin_fn( + name = RestoreMetadataFunction, + display_name = restore_metadata, + sig_fn = restore_signature, + ret = uint64, +)] +pub(crate) async fn restore_metadata( + metadata_snapshot_handler: &MetadataSnapshotHandlerRef, + _query_ctx: &QueryContextRef, + _params: &[ValueRef<'_>], +) -> Result { + let num_keyvalues = metadata_snapshot_handler + .restore( + METADATA_DIR, + &format!("{METADATA_FILE_NAME}.{METADATA_FILE_EXTENSION}"), + ) + .await?; + Ok(Value::from(num_keyvalues)) +} + +fn restore_signature() -> Signature { + Signature::uniform(0, vec![], Volatility::Immutable) +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index bcb6ce5460..f434d89ffc 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -89,8 +89,18 @@ pub trait FlowServiceHandler: Send + Sync { ) -> Result; } +/// This metadata snapshot handler is only use for dump and restore metadata for now. +#[async_trait] +pub trait MetadataSnapshotHandler: Send + Sync { + async fn dump(&self, path: &str, filename: &str) -> Result; + + async fn restore(&self, path: &str, filename: &str) -> Result; +} + pub type TableMutationHandlerRef = Arc; pub type ProcedureServiceHandlerRef = Arc; pub type FlowServiceHandlerRef = Arc; + +pub type MetadataSnapshotHandlerRef = Arc; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 211f7e1438..4fc2da7c1c 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef}; +use crate::handlers::{ + FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef, + TableMutationHandlerRef, +}; /// Shared state for SQL functions. /// The handlers in state may be `None` in cli command-line or test cases. @@ -24,6 +27,8 @@ pub struct FunctionState { pub procedure_service_handler: Option, // The flownode handler pub flow_service_handler: Option, + // The metadata snapshot handler + pub metadata_snapshot_handler: Option, } impl FunctionState { @@ -48,10 +53,14 @@ impl FunctionState { CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, }; - use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler}; + use crate::handlers::{ + FlowServiceHandler, MetadataSnapshotHandler, ProcedureServiceHandler, + TableMutationHandler, + }; struct MockProcedureServiceHandler; struct MockTableMutationHandler; struct MockFlowServiceHandler; + struct MockMetadataServiceHandler; const ROWS: usize = 42; #[async_trait] @@ -150,10 +159,22 @@ impl FunctionState { } } + #[async_trait] + impl MetadataSnapshotHandler for MockMetadataServiceHandler { + async fn dump(&self, _path: &str, _filename: &str) -> Result { + Ok("test_filename".to_string()) + } + + async fn restore(&self, _path: &str, _filename: &str) -> Result { + Ok(100) + } + } + Self { table_mutation_handler: Some(Arc::new(MockTableMutationHandler)), procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)), flow_service_handler: Some(Arc::new(MockFlowServiceHandler)), + metadata_snapshot_handler: Some(Arc::new(MockMetadataServiceHandler)), } } } diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index 902bede814..6a09002216 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -179,6 +179,10 @@ fn build_struct( Ident::new("flow_service_handler", handler_type.span()), Ident::new("MissingFlowServiceHandlerSnafu", handler_type.span()), ), + "MetadataSnapshotHandlerRef" => ( + Ident::new("metadata_snapshot_handler", handler_type.span()), + Ident::new("MissingMetadataSnapshotHandlerSnafu", handler_type.span()), + ), handler => ok!(error!( handler_type.span(), format!("Unknown handler type: {handler}") diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index bb3290fe75..2723f86ddb 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -36,7 +36,7 @@ use crate::snapshot::file::{Document, KeyValue as FileKeyValue}; /// The format of the backup file. #[derive(Debug, PartialEq, Eq, Display, Clone, Copy)] -pub(crate) enum FileFormat { +pub enum FileFormat { #[strum(serialize = "fb")] FlexBuffers, } @@ -54,7 +54,7 @@ impl TryFrom<&str> for FileFormat { #[derive(Debug, PartialEq, Eq, Display)] #[strum(serialize_all = "lowercase")] -pub(crate) enum DataType { +pub enum DataType { Metadata, } @@ -70,11 +70,17 @@ impl TryFrom<&str> for DataType { } #[derive(Debug, PartialEq, Eq)] -pub(crate) struct FileExtension { +pub struct FileExtension { format: FileFormat, data_type: DataType, } +impl FileExtension { + pub fn new(format: FileFormat, data_type: DataType) -> Self { + Self { format, data_type } + } +} + impl Display for FileExtension { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}", self.data_type, self.format) @@ -105,7 +111,7 @@ impl TryFrom<&str> for FileExtension { } #[derive(Debug, PartialEq, Eq)] -pub(crate) struct FileName { +pub struct FileName { name: String, extension: FileExtension, } @@ -207,7 +213,7 @@ impl MetadataSnapshotManager { } /// Dumps the metadata to the backup file. - pub async fn dump(&self, path: &str, filename: &str) -> Result { + pub async fn dump(&self, path: &str, filename: &str) -> Result<(String, u64)> { let format = FileFormat::FlexBuffers; let filename = FileName::new( filename.to_string(), @@ -248,7 +254,7 @@ impl MetadataSnapshotManager { now.elapsed() ); - Ok(num_keyvalues as u64) + Ok((filename.to_string(), num_keyvalues as u64)) } } diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index a6d2997e95..ab2baa4abe 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -162,6 +162,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to do metadata snapshot"))] + MetadataSnapshot { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to do procedure task"))] ProcedureService { source: BoxedError, @@ -187,6 +194,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Missing MetadataSnapshotHandler, not expected"))] + MissingMetadataSnapshotHandler { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid function args: {}", err_msg))] InvalidFuncArgs { err_msg: String, @@ -251,6 +264,7 @@ impl ErrorExt for Error { Error::MissingTableMutationHandler { .. } | Error::MissingProcedureServiceHandler { .. } | Error::MissingFlowServiceHandler { .. } + | Error::MissingMetadataSnapshotHandler { .. } | Error::RegisterUdf { .. } => StatusCode::Unexpected, Error::UnsupportedInputDataType { .. } @@ -262,7 +276,8 @@ impl ErrorExt for Error { Error::DecodePlan { source, .. } | Error::Execute { source, .. } | Error::ProcedureService { source, .. } - | Error::TableMutation { source, .. } => source.status_code(), + | Error::TableMutation { source, .. } + | Error::MetadataSnapshot { source, .. } => source.status_code(), Error::PermissionDenied { .. } => StatusCode::PermissionDenied, } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4b1e720032..1234de1f11 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -357,6 +357,7 @@ impl DatanodeBuilder { None, None, None, + None, false, self.plugins.clone(), opts.query.clone(), diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 53712ffb67..03d905f9ee 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -334,6 +334,7 @@ impl FlownodeBuilder { None, None, None, + None, false, Default::default(), self.opts.query.clone(), diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs index ecaabae32d..b60bee4e6e 100644 --- a/src/flow/src/test_utils.rs +++ b/src/flow/src/test_utils.rs @@ -153,6 +153,7 @@ pub fn create_test_query_engine() -> Arc { None, None, None, + None, false, QueryOptions::default(), ); diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 04c7f40e68..8756d0bd30 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -270,6 +270,7 @@ mod test { None, None, None, + None, false, QueryOptions::default(), ); diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ffbfeabca1..17d1c5fe05 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -24,9 +24,11 @@ use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; +use common_meta::snapshot::MetadataSnapshotManager; use operator::delete::Deleter; use operator::flow::FlowServiceOperator; use operator::insert::Inserter; +use operator::metadata::MetadataSnapshotOperator; use operator::procedure::ProcedureServiceOperator; use operator::request::Requester; use operator::statement::{StatementExecutor, StatementExecutorRef}; @@ -55,6 +57,7 @@ pub struct FrontendBuilder { plugins: Option, procedure_executor: ProcedureExecutorRef, stats: StatementStatistics, + metadata_snapshot_manager: Option, } impl FrontendBuilder { @@ -77,6 +80,17 @@ impl FrontendBuilder { plugins: None, procedure_executor, stats, + metadata_snapshot_manager: None, + } + } + + pub fn with_metadata_snapshot_manager( + self, + metadata_snapshot_manager: MetadataSnapshotManager, + ) -> Self { + Self { + metadata_snapshot_manager: Some(metadata_snapshot_manager), + ..self } } @@ -158,12 +172,17 @@ impl FrontendBuilder { let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone()); + let metadata_snapshot_operator = self + .metadata_snapshot_manager + .map(|manager| Arc::new(MetadataSnapshotOperator::new(manager)) as _); + let query_engine = QueryEngineFactory::new_with_plugins( self.catalog_manager.clone(), Some(region_query_handler.clone()), Some(table_mutation_handler), Some(procedure_service_handler), Some(Arc::new(flow_service)), + metadata_snapshot_operator, true, plugins.clone(), self.options.query.clone(), diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index ae00ef40dd..085c1574f2 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -20,6 +20,7 @@ pub mod error; pub mod expr_helper; pub mod flow; pub mod insert; +pub mod metadata; pub mod metrics; pub mod procedure; pub mod region_req_factory; diff --git a/src/operator/src/metadata.rs b/src/operator/src/metadata.rs new file mode 100644 index 0000000000..927b31723c --- /dev/null +++ b/src/operator/src/metadata.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_function::handlers::MetadataSnapshotHandler; +use common_meta::snapshot::MetadataSnapshotManager; +use common_query::error as query_error; +use common_query::error::Result as QueryResult; +use snafu::ResultExt; + +/// The operator of the metadata snapshot. +pub struct MetadataSnapshotOperator { + operator: MetadataSnapshotManager, +} + +impl MetadataSnapshotOperator { + pub fn new(operator: MetadataSnapshotManager) -> Self { + Self { operator } + } +} + +#[async_trait] +impl MetadataSnapshotHandler for MetadataSnapshotOperator { + async fn dump(&self, path: &str, filename: &str) -> QueryResult { + self.operator + .dump(path, filename) + .await + .map_err(BoxedError::new) + .map(|(file, _)| file) + .context(query_error::MetadataSnapshotSnafu) + } + + async fn restore(&self, path: &str, filename: &str) -> QueryResult { + let filepath = format!("{}{}", path, filename); + self.operator + .restore(&filepath) + .await + .map_err(BoxedError::new) + .context(query_error::MetadataSnapshotSnafu) + } +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index db4207fd8a..fffb14aab4 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -588,6 +588,7 @@ mod tests { None, None, None, + None, false, QueryOptions::default(), ) diff --git a/src/query/src/optimizer/constant_term.rs b/src/query/src/optimizer/constant_term.rs index 60e5b76d9d..9c6fc790f5 100644 --- a/src/query/src/optimizer/constant_term.rs +++ b/src/query/src/optimizer/constant_term.rs @@ -295,6 +295,7 @@ mod tests { None, None, None, + None, false, Default::default(), ) diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 8b0c091054..477db1bd98 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -25,7 +25,8 @@ use common_base::Plugins; use common_function::function::FunctionRef; use common_function::function_registry::FUNCTION_REGISTRY; use common_function::handlers::{ - FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, + FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef, + TableMutationHandlerRef, }; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::Output; @@ -100,12 +101,14 @@ pub struct QueryEngineFactory { } impl QueryEngineFactory { + #[allow(clippy::too_many_arguments)] pub fn new( catalog_manager: CatalogManagerRef, region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, flow_service_handler: Option, + metadata_snapshot_handler: Option, with_dist_planner: bool, options: QueryOptions, ) -> Self { @@ -115,6 +118,7 @@ impl QueryEngineFactory { table_mutation_handler, procedure_service_handler, flow_service_handler, + metadata_snapshot_handler, with_dist_planner, Default::default(), options, @@ -128,6 +132,7 @@ impl QueryEngineFactory { table_mutation_handler: Option, procedure_service_handler: Option, flow_service_handler: Option, + metadata_snapshot_handler: Option, with_dist_planner: bool, plugins: Plugins, options: QueryOptions, @@ -138,6 +143,7 @@ impl QueryEngineFactory { table_mutation_handler, procedure_service_handler, flow_service_handler, + metadata_snapshot_handler, with_dist_planner, plugins.clone(), options, @@ -178,6 +184,7 @@ mod tests { None, None, None, + None, false, QueryOptions::default(), ); diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index df20a70a42..c967fc38bd 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -84,6 +84,7 @@ impl QueryEngineContext { None, None, None, + None, false, Plugins::default(), QueryOptions::default(), diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs index c3feed1d55..e9cb338f85 100644 --- a/src/query/src/query_engine/default_serializer.rs +++ b/src/query/src/query_engine/default_serializer.rs @@ -184,6 +184,7 @@ mod tests { None, None, None, + None, false, QueryOptions::default(), ); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index d55ab471f9..77fbc31731 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -21,7 +21,8 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; use common_function::handlers::{ - FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, + FlowServiceHandlerRef, MetadataSnapshotHandlerRef, ProcedureServiceHandlerRef, + TableMutationHandlerRef, }; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::state::FunctionState; @@ -91,6 +92,7 @@ impl QueryEngineState { table_mutation_handler: Option, procedure_service_handler: Option, flow_service_handler: Option, + metadata_snapshot_handler: Option, with_dist_planner: bool, plugins: Plugins, options: QueryOptionsNew, @@ -181,6 +183,7 @@ impl QueryEngineState { table_mutation_handler, procedure_service_handler, flow_service_handler, + metadata_snapshot_handler, }), aggregate_functions: Arc::new(RwLock::new(HashMap::new())), extension_rules, diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 5e0f223663..45f9ceb4a1 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -670,6 +670,7 @@ mod test { None, None, None, + None, false, QueryOptions::default(), ) diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 7c004e5229..addb48865a 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -53,6 +53,7 @@ pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { None, None, None, + None, false, QueryOptions::default(), ) diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 07bac1363a..a9dce42c3e 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -50,6 +50,7 @@ async fn test_datafusion_query_engine() -> Result<()> { None, None, None, + None, false, QueryOptionsNew::default(), ); @@ -137,6 +138,7 @@ async fn test_query_validate() -> Result<()> { None, None, None, + None, false, plugins, QueryOptionsNew::default(), diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 84bdd8cb18..8828865df7 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -109,6 +109,7 @@ fn create_test_engine() -> TimeRangeTester { None, None, None, + None, false, QueryOptions::default(), ) diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 43aeb362fa..e044f00822 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -177,6 +177,7 @@ fn create_testing_instance(table: TableRef) -> DummyInstance { None, None, None, + None, false, QueryOptions::default(), )