diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b0601088cf..e0f2c673ff 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -20,6 +20,7 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::CatalogManagerRef; use catalog::information_schema::InformationExtensionRef; use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder}; use catalog::process_manager::ProcessManager; @@ -28,7 +29,8 @@ use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{Configurable, metadata_store_dir}; use common_error::ext::BoxedError; -use common_meta::cache::LayeredCacheRegistryBuilder; +use common_meta::DatanodeId; +use common_meta::cache::{LayeredCacheRegistryBuilder, LayeredCacheRegistryRef}; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; @@ -53,8 +55,8 @@ use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::region_server::RegionServer; use flow::{ - FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker, - GrpcQueryHandlerWithBoxedError, + FlowDualEngineRef, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, + FrontendInvoker, GrpcQueryHandlerWithBoxedError, }; use frontend::frontend::Frontend; use frontend::instance::StandaloneDatanodeManager; @@ -124,8 +126,8 @@ pub struct Instance { frontend: Frontend, flownode: FlownodeInstance, procedure_manager: ProcedureManagerRef, - wal_provider: WalProviderRef, leader_services_controller: Box, + leader_services_context: LeaderServicesContext, // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } @@ -159,11 +161,7 @@ impl App for Instance { self.datanode.start_telemetry(); self.leader_services_controller - .start( - self.procedure_manager.clone(), - self.wal_provider.clone(), - self.datanode.region_server(), - ) + .start(self.leader_services_context.clone()) .await?; plugins::start_frontend_plugins(self.frontend.instance.plugins().clone()) @@ -379,6 +377,8 @@ impl StartCommand { opts.grpc.detect_server_addr(); let fe_opts = opts.frontend_options(); let dn_opts = opts.datanode_options(); + let node_id = dn_opts.node_id; + let init_regions_parallelism = dn_opts.init_regions_parallelism; plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts) .await @@ -491,21 +491,18 @@ impl StartCommand { .await .map_err(BoxedError::new) .context(error::OtherSnafu)?; + let flow_engine = flownode.flow_engine(); // set the ref to query for the local flow state { information_extension - .set_flow_engine(flownode.flow_engine()) + .set_flow_engine(flow_engine.clone()) .await; } let node_manager = creator .node_manager_creator - .create( - &kv_backend, - datanode.region_server(), - flownode.flow_engine(), - ) + .create(&kv_backend, datanode.region_server(), flow_engine.clone()) .await?; let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend); @@ -596,7 +593,7 @@ impl StartCommand { .await; // set the frontend invoker for flownode - let flow_streaming_engine = flownode.flow_engine().streaming_engine(); + let flow_streaming_engine = flow_engine.streaming_engine(); // flow server need to be able to use frontend to write insert requests back let invoker = FrontendInvoker::build_from( flow_streaming_engine.clone(), @@ -620,14 +617,27 @@ impl StartCommand { servers, heartbeat_task: None, }; + let leader_services_context = LeaderServicesContext { + procedure_manager: procedure_manager.clone(), + wal_provider: wal_provider.clone(), + region_server: datanode.region_server(), + kv_backend: kv_backend.clone(), + cache_registry: layered_cache_registry, + catalog_manager, + flow_engine, + frontend_client, + node_id, + init_regions_parallelism, + plugin_options: plugin_opts, + }; let instance = Instance { datanode, frontend, flownode, procedure_manager, - wal_provider, leader_services_controller: creator.leader_services_controller, + leader_services_context, _guard: vec![], }; let result = InstanceCreatorResult { @@ -743,16 +753,11 @@ impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator { #[async_trait] pub trait StandaloneLeaderServicesController: Send + Sync { - /// Starts services that manage standalone metadata or WAL state. + /// Starts leader services that manage standalone metadata or WAL state. /// /// The default implementation starts the procedure manager and WAL provider /// during instance startup. - async fn start( - &self, - procedure_manager: ProcedureManagerRef, - wal_provider: WalProviderRef, - region_server: RegionServer, - ) -> Result<()>; + async fn start(&self, context: LeaderServicesContext) -> Result<()>; /// Stops services started by [`StandaloneLeaderServicesController::start`]. async fn stop( @@ -762,21 +767,42 @@ pub trait StandaloneLeaderServicesController: Send + Sync { ) -> Result<()>; } +#[derive(Clone)] +/// Additional runtime handles for custom leader-service controllers. +/// +/// The default standalone startup only needs to start/stop the procedure +/// manager and WAL provider. Some embedders need to do more work around +/// leader-service startup, for example reconciling metadata-backed runtime +/// state before publishing writable leadership. Grouping those handles here +/// keeps `Instance` small and avoids expanding +/// [`StandaloneLeaderServicesController::start`] every time a custom lifecycle +/// needs one more standalone component. +pub struct LeaderServicesContext { + pub procedure_manager: ProcedureManagerRef, + pub wal_provider: WalProviderRef, + pub region_server: RegionServer, + pub kv_backend: KvBackendRef, + pub cache_registry: LayeredCacheRegistryRef, + pub catalog_manager: CatalogManagerRef, + pub flow_engine: FlowDualEngineRef, + pub frontend_client: Arc, + pub node_id: Option, + pub init_regions_parallelism: usize, + pub plugin_options: Vec, +} + pub struct DefaultStandaloneLeaderServicesController; #[async_trait] impl StandaloneLeaderServicesController for DefaultStandaloneLeaderServicesController { - async fn start( - &self, - procedure_manager: ProcedureManagerRef, - wal_provider: WalProviderRef, - _region_server: RegionServer, - ) -> Result<()> { - procedure_manager + async fn start(&self, context: LeaderServicesContext) -> Result<()> { + context + .procedure_manager .start() .await .context(error::StartProcedureManagerSnafu)?; - wal_provider + context + .wal_provider .start() .await .context(error::StartWalProviderSnafu) diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index e3a3e13a76..e3a1a50adc 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -196,8 +196,8 @@ where #[async_trait::async_trait] impl CacheInvalidator for CacheContainer where - K: Send + Sync, - V: Send + Sync, + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, { async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> { let idents = caches @@ -211,6 +211,12 @@ where Ok(()) } + + fn invalidate_all(&self) -> Result<()> { + self.inc_version(); + self.cache.invalidate_all(); + Ok(()) + } } impl CacheContainer diff --git a/src/common/meta/src/cache/registry.rs b/src/common/meta/src/cache/registry.rs index d541525f98..b7ee82b6e5 100644 --- a/src/common/meta/src/cache/registry.rs +++ b/src/common/meta/src/cache/registry.rs @@ -67,6 +67,13 @@ impl CacheInvalidator for LayeredCacheRegistry { } results.into_iter().collect::>>().map(|_| ()) } + + fn invalidate_all(&self) -> Result<()> { + for registry in &self.layers { + registry.invalidate_all()?; + } + Ok(()) + } } impl LayeredCacheRegistry { @@ -124,6 +131,13 @@ impl CacheInvalidator for CacheRegistry { .collect::>>()?; Ok(()) } + + fn invalidate_all(&self) -> Result<()> { + for invalidator in &self.indexes { + invalidator.invalidate_all()?; + } + Ok(()) + } } impl CacheRegistry { @@ -149,6 +163,8 @@ mod tests { use crate::cache::registry::CacheRegistryBuilder; use crate::cache::*; + use crate::cache_invalidator::{CacheInvalidator, Context}; + use crate::error::Result; use crate::instruction::CacheIdent; fn always_true_filter(_: &CacheIdent) -> bool { @@ -259,4 +275,91 @@ mod tests { .unwrap(); assert_eq!(cache.name(), "string_cache"); } + + #[tokio::test] + async fn test_registry_invalidate_all() { + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let i32_cache = Arc::new(test_i32_cache("i32_cache", invalidator)); + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let string_cache = Arc::new(test_cache("string_cache", invalidator)); + + i32_cache.get(1).await.unwrap(); + string_cache.get_by_ref("foo").await.unwrap(); + assert!(i32_cache.contains_key(&1)); + assert!(string_cache.contains_key("foo")); + + let registry = CacheRegistryBuilder::default() + .add_cache(i32_cache.clone()) + .add_cache(string_cache.clone()) + .build(); + + registry.invalidate_all().unwrap(); + + assert!(!i32_cache.contains_key(&1)); + assert!(!string_cache.contains_key("foo")); + } + + struct LayerOrderInvalidator { + expected_order: i32, + order: Arc, + } + + #[async_trait::async_trait] + impl CacheInvalidator for LayerOrderInvalidator { + async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> { + Ok(()) + } + + fn invalidate_all(&self) -> Result<()> { + let previous = self.order.fetch_add(1, Ordering::Relaxed); + assert_eq!(self.expected_order, previous); + Ok(()) + } + } + + #[tokio::test] + async fn test_layered_registry_invalidate_all() { + let order = Arc::new(AtomicI32::new(0)); + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let first_layer_cache = Arc::new(test_cache("first_layer_cache", invalidator)); + let first_layer_order = Arc::new(LayerOrderInvalidator { + expected_order: 0, + order: order.clone(), + }); + let first_layer = CacheRegistryBuilder::default() + .add_cache(first_layer_order) + .add_cache(first_layer_cache.clone()) + .build(); + + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let second_layer_cache = Arc::new(test_i32_cache("second_layer_cache", invalidator)); + let second_layer_order = Arc::new(LayerOrderInvalidator { + expected_order: 1, + order: order.clone(), + }); + let second_layer = CacheRegistryBuilder::default() + .add_cache(second_layer_order) + .add_cache(second_layer_cache.clone()) + .build(); + + first_layer_cache.get_by_ref("foo").await.unwrap(); + second_layer_cache.get(1).await.unwrap(); + assert!(first_layer_cache.contains_key("foo")); + assert!(second_layer_cache.contains_key(&1)); + + let registry = LayeredCacheRegistryBuilder::default() + .add_cache_registry(first_layer) + .add_cache_registry(second_layer) + .build(); + + registry.invalidate_all().unwrap(); + + assert_eq!(2, order.load(Ordering::Relaxed)); + assert!(!first_layer_cache.contains_key("foo")); + assert!(!second_layer_cache.contains_key(&1)); + } } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index ffc3dd1c9a..4fe0699ba5 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -55,6 +55,13 @@ pub struct Context { pub trait CacheInvalidator: Send + Sync { async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>; + /// Invalidates every cache entry owned by this invalidator. + /// + /// This method is required so each implementer explicitly decides how + /// full-cache invalidation should behave. Implementations that intentionally + /// do nothing must document why a no-op is safe. + fn invalidate_all(&self) -> Result<()>; + fn name(&self) -> &'static str { std::any::type_name::() } @@ -69,6 +76,11 @@ impl CacheInvalidator for DummyCacheInvalidator { async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> { Ok(()) } + + fn invalidate_all(&self) -> Result<()> { + // Dummy invalidator owns no cache state, so there is nothing to clear. + Ok(()) + } } #[async_trait::async_trait] @@ -157,4 +169,11 @@ where } Ok(()) } + + fn invalidate_all(&self) -> Result<()> { + // KvCacheInvalidator only knows how to invalidate explicit metadata + // keys. There is no safe generic way to enumerate or clear the backend + // keyspace, so full invalidation is intentionally a no-op here. + Ok(()) + } } diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs index 488ddacdf0..c5cd008c28 100644 --- a/src/datanode/src/utils.rs +++ b/src/datanode/src/utils.rs @@ -29,10 +29,28 @@ use tracing::info; use crate::error::{GetMetadataSnafu, Result}; /// The requests to open regions. -pub(crate) struct RegionOpenRequests { - pub leader_regions: Vec<(RegionId, RegionOpenRequest)>, +pub struct RegionOpenRequests { + pub(crate) leader_regions: Vec<(RegionId, RegionOpenRequest)>, #[cfg(feature = "enterprise")] - pub follower_regions: Vec<(RegionId, RegionOpenRequest)>, + pub(crate) follower_regions: Vec<(RegionId, RegionOpenRequest)>, +} + +impl RegionOpenRequests { + /// Splits the request set into leader and follower regions. + #[allow(clippy::type_complexity)] + pub fn into_parts( + self, + ) -> ( + Vec<(RegionId, RegionOpenRequest)>, + Vec<(RegionId, RegionOpenRequest)>, + ) { + let leader_regions = self.leader_regions; + #[cfg(feature = "enterprise")] + let follower_regions = self.follower_regions; + #[cfg(not(feature = "enterprise"))] + let follower_regions = Vec::new(); + (leader_regions, follower_regions) + } } fn group_region_by_topic( @@ -58,7 +76,8 @@ fn get_replay_checkpoint( }) } -pub(crate) async fn build_region_open_requests( +/// Builds region-open requests from persisted metadata. +pub async fn build_region_open_requests( node_id: DatanodeId, kv_backend: KvBackendRef, ) -> Result { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 53a3265d7d..f4ca149f1a 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -465,6 +465,11 @@ impl FlowDualEngine { Ok(()) } + /// Reconciles in-memory flow tasks from persisted metadata. + pub async fn reconcile_flows_from_metadata(&self) -> Result<(), Error> { + self.check_flow_consistent(true, true).await + } + /// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result { self.flow_metadata_manager diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index b594d65f48..f6ec0b4fc9 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -84,4 +84,11 @@ impl CacheInvalidator for MetasrvCacheInvalidator { let instruction = Instruction::InvalidateCaches(caches.to_vec()); self.broadcast(ctx, instruction).await } + + fn invalidate_all(&self) -> MetaResult<()> { + // MetasrvCacheInvalidator only broadcasts concrete cache identifiers to + // remote nodes. The heartbeat instruction protocol has no global + // invalidate-all message, so there is no safe broadcast to send here. + Ok(()) + } }