chore: add LeaderServicesContext control to standalone (#8164)

* chore: add refresh hook

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: merge start_with_context and start

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: place reset in recover

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: revert stop changes

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-05-28 17:23:30 +08:00
committed by GitHub
parent 123524474d
commit 17815830ed
7 changed files with 223 additions and 38 deletions

View File

@@ -20,6 +20,7 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::CatalogManagerRef;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
use catalog::process_manager::ProcessManager;
@@ -28,7 +29,8 @@ use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{Configurable, metadata_store_dir};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::DatanodeId;
use common_meta::cache::{LayeredCacheRegistryBuilder, LayeredCacheRegistryRef};
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
@@ -53,8 +55,8 @@ use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
GrpcQueryHandlerWithBoxedError,
FlowDualEngineRef, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
FrontendInvoker, GrpcQueryHandlerWithBoxedError,
};
use frontend::frontend::Frontend;
use frontend::instance::StandaloneDatanodeManager;
@@ -124,8 +126,8 @@ pub struct Instance {
frontend: Frontend,
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
leader_services_context: LeaderServicesContext,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -159,11 +161,7 @@ impl App for Instance {
self.datanode.start_telemetry();
self.leader_services_controller
.start(
self.procedure_manager.clone(),
self.wal_provider.clone(),
self.datanode.region_server(),
)
.start(self.leader_services_context.clone())
.await?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
@@ -379,6 +377,8 @@ impl StartCommand {
opts.grpc.detect_server_addr();
let fe_opts = opts.frontend_options();
let dn_opts = opts.datanode_options();
let node_id = dn_opts.node_id;
let init_regions_parallelism = dn_opts.init_regions_parallelism;
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
.await
@@ -491,21 +491,18 @@ impl StartCommand {
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?;
let flow_engine = flownode.flow_engine();
// set the ref to query for the local flow state
{
information_extension
.set_flow_engine(flownode.flow_engine())
.set_flow_engine(flow_engine.clone())
.await;
}
let node_manager = creator
.node_manager_creator
.create(
&kv_backend,
datanode.region_server(),
flownode.flow_engine(),
)
.create(&kv_backend, datanode.region_server(), flow_engine.clone())
.await?;
let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend);
@@ -596,7 +593,7 @@ impl StartCommand {
.await;
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let flow_streaming_engine = flow_engine.streaming_engine();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_streaming_engine.clone(),
@@ -620,14 +617,27 @@ impl StartCommand {
servers,
heartbeat_task: None,
};
let leader_services_context = LeaderServicesContext {
procedure_manager: procedure_manager.clone(),
wal_provider: wal_provider.clone(),
region_server: datanode.region_server(),
kv_backend: kv_backend.clone(),
cache_registry: layered_cache_registry,
catalog_manager,
flow_engine,
frontend_client,
node_id,
init_regions_parallelism,
plugin_options: plugin_opts,
};
let instance = Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_provider,
leader_services_controller: creator.leader_services_controller,
leader_services_context,
_guard: vec![],
};
let result = InstanceCreatorResult {
@@ -743,16 +753,11 @@ impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
#[async_trait]
pub trait StandaloneLeaderServicesController: Send + Sync {
/// Starts services that manage standalone metadata or WAL state.
/// Starts leader services that manage standalone metadata or WAL state.
///
/// The default implementation starts the procedure manager and WAL provider
/// during instance startup.
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
region_server: RegionServer,
) -> Result<()>;
async fn start(&self, context: LeaderServicesContext) -> Result<()>;
/// Stops services started by [`StandaloneLeaderServicesController::start`].
async fn stop(
@@ -762,21 +767,42 @@ pub trait StandaloneLeaderServicesController: Send + Sync {
) -> Result<()>;
}
#[derive(Clone)]
/// Additional runtime handles for custom leader-service controllers.
///
/// The default standalone startup only needs to start/stop the procedure
/// manager and WAL provider. Some embedders need to do more work around
/// leader-service startup, for example reconciling metadata-backed runtime
/// state before publishing writable leadership. Grouping those handles here
/// keeps `Instance` small and avoids expanding
/// [`StandaloneLeaderServicesController::start`] every time a custom lifecycle
/// needs one more standalone component.
pub struct LeaderServicesContext {
pub procedure_manager: ProcedureManagerRef,
pub wal_provider: WalProviderRef,
pub region_server: RegionServer,
pub kv_backend: KvBackendRef,
pub cache_registry: LayeredCacheRegistryRef,
pub catalog_manager: CatalogManagerRef,
pub flow_engine: FlowDualEngineRef,
pub frontend_client: Arc<FrontendClient>,
pub node_id: Option<DatanodeId>,
pub init_regions_parallelism: usize,
pub plugin_options: Vec<PluginOptions>,
}
pub struct DefaultStandaloneLeaderServicesController;
#[async_trait]
impl StandaloneLeaderServicesController for DefaultStandaloneLeaderServicesController {
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
_region_server: RegionServer,
) -> Result<()> {
procedure_manager
async fn start(&self, context: LeaderServicesContext) -> Result<()> {
context
.procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;
wal_provider
context
.wal_provider
.start()
.await
.context(error::StartWalProviderSnafu)

View File

@@ -196,8 +196,8 @@ where
#[async_trait::async_trait]
impl<K, V> CacheInvalidator for CacheContainer<K, V, CacheIdent>
where
K: Send + Sync,
V: Send + Sync,
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
let idents = caches
@@ -211,6 +211,12 @@ where
Ok(())
}
fn invalidate_all(&self) -> Result<()> {
self.inc_version();
self.cache.invalidate_all();
Ok(())
}
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>

View File

@@ -67,6 +67,13 @@ impl CacheInvalidator for LayeredCacheRegistry {
}
results.into_iter().collect::<Result<Vec<_>>>().map(|_| ())
}
fn invalidate_all(&self) -> Result<()> {
for registry in &self.layers {
registry.invalidate_all()?;
}
Ok(())
}
}
impl LayeredCacheRegistry {
@@ -124,6 +131,13 @@ impl CacheInvalidator for CacheRegistry {
.collect::<Result<Vec<_>>>()?;
Ok(())
}
fn invalidate_all(&self) -> Result<()> {
for invalidator in &self.indexes {
invalidator.invalidate_all()?;
}
Ok(())
}
}
impl CacheRegistry {
@@ -149,6 +163,8 @@ mod tests {
use crate::cache::registry::CacheRegistryBuilder;
use crate::cache::*;
use crate::cache_invalidator::{CacheInvalidator, Context};
use crate::error::Result;
use crate::instruction::CacheIdent;
fn always_true_filter(_: &CacheIdent) -> bool {
@@ -259,4 +275,91 @@ mod tests {
.unwrap();
assert_eq!(cache.name(), "string_cache");
}
#[tokio::test]
async fn test_registry_invalidate_all() {
let invalidator: Invalidator<_, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let i32_cache = Arc::new(test_i32_cache("i32_cache", invalidator));
let invalidator: Invalidator<_, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let string_cache = Arc::new(test_cache("string_cache", invalidator));
i32_cache.get(1).await.unwrap();
string_cache.get_by_ref("foo").await.unwrap();
assert!(i32_cache.contains_key(&1));
assert!(string_cache.contains_key("foo"));
let registry = CacheRegistryBuilder::default()
.add_cache(i32_cache.clone())
.add_cache(string_cache.clone())
.build();
registry.invalidate_all().unwrap();
assert!(!i32_cache.contains_key(&1));
assert!(!string_cache.contains_key("foo"));
}
struct LayerOrderInvalidator {
expected_order: i32,
order: Arc<AtomicI32>,
}
#[async_trait::async_trait]
impl CacheInvalidator for LayerOrderInvalidator {
async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
Ok(())
}
fn invalidate_all(&self) -> Result<()> {
let previous = self.order.fetch_add(1, Ordering::Relaxed);
assert_eq!(self.expected_order, previous);
Ok(())
}
}
#[tokio::test]
async fn test_layered_registry_invalidate_all() {
let order = Arc::new(AtomicI32::new(0));
let invalidator: Invalidator<_, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let first_layer_cache = Arc::new(test_cache("first_layer_cache", invalidator));
let first_layer_order = Arc::new(LayerOrderInvalidator {
expected_order: 0,
order: order.clone(),
});
let first_layer = CacheRegistryBuilder::default()
.add_cache(first_layer_order)
.add_cache(first_layer_cache.clone())
.build();
let invalidator: Invalidator<_, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let second_layer_cache = Arc::new(test_i32_cache("second_layer_cache", invalidator));
let second_layer_order = Arc::new(LayerOrderInvalidator {
expected_order: 1,
order: order.clone(),
});
let second_layer = CacheRegistryBuilder::default()
.add_cache(second_layer_order)
.add_cache(second_layer_cache.clone())
.build();
first_layer_cache.get_by_ref("foo").await.unwrap();
second_layer_cache.get(1).await.unwrap();
assert!(first_layer_cache.contains_key("foo"));
assert!(second_layer_cache.contains_key(&1));
let registry = LayeredCacheRegistryBuilder::default()
.add_cache_registry(first_layer)
.add_cache_registry(second_layer)
.build();
registry.invalidate_all().unwrap();
assert_eq!(2, order.load(Ordering::Relaxed));
assert!(!first_layer_cache.contains_key("foo"));
assert!(!second_layer_cache.contains_key(&1));
}
}

View File

@@ -55,6 +55,13 @@ pub struct Context {
pub trait CacheInvalidator: Send + Sync {
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
/// Invalidates every cache entry owned by this invalidator.
///
/// This method is required so each implementer explicitly decides how
/// full-cache invalidation should behave. Implementations that intentionally
/// do nothing must document why a no-op is safe.
fn invalidate_all(&self) -> Result<()>;
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}
@@ -69,6 +76,11 @@ impl CacheInvalidator for DummyCacheInvalidator {
async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
Ok(())
}
fn invalidate_all(&self) -> Result<()> {
// Dummy invalidator owns no cache state, so there is nothing to clear.
Ok(())
}
}
#[async_trait::async_trait]
@@ -157,4 +169,11 @@ where
}
Ok(())
}
fn invalidate_all(&self) -> Result<()> {
// KvCacheInvalidator only knows how to invalidate explicit metadata
// keys. There is no safe generic way to enumerate or clear the backend
// keyspace, so full invalidation is intentionally a no-op here.
Ok(())
}
}

View File

@@ -29,10 +29,28 @@ use tracing::info;
use crate::error::{GetMetadataSnafu, Result};
/// The requests to open regions.
pub(crate) struct RegionOpenRequests {
pub leader_regions: Vec<(RegionId, RegionOpenRequest)>,
pub struct RegionOpenRequests {
pub(crate) leader_regions: Vec<(RegionId, RegionOpenRequest)>,
#[cfg(feature = "enterprise")]
pub follower_regions: Vec<(RegionId, RegionOpenRequest)>,
pub(crate) follower_regions: Vec<(RegionId, RegionOpenRequest)>,
}
impl RegionOpenRequests {
/// Splits the request set into leader and follower regions.
#[allow(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
Vec<(RegionId, RegionOpenRequest)>,
Vec<(RegionId, RegionOpenRequest)>,
) {
let leader_regions = self.leader_regions;
#[cfg(feature = "enterprise")]
let follower_regions = self.follower_regions;
#[cfg(not(feature = "enterprise"))]
let follower_regions = Vec::new();
(leader_regions, follower_regions)
}
}
fn group_region_by_topic(
@@ -58,7 +76,8 @@ fn get_replay_checkpoint(
})
}
pub(crate) async fn build_region_open_requests(
/// Builds region-open requests from persisted metadata.
pub async fn build_region_open_requests(
node_id: DatanodeId,
kv_backend: KvBackendRef,
) -> Result<RegionOpenRequests> {

View File

@@ -465,6 +465,11 @@ impl FlowDualEngine {
Ok(())
}
/// Reconciles in-memory flow tasks from persisted metadata.
pub async fn reconcile_flows_from_metadata(&self) -> Result<(), Error> {
self.check_flow_consistent(true, true).await
}
/// TODO(discord9): also add a `exists` api using flow metadata manager's `exists` method
async fn flow_exist_in_metadata(&self, flow_id: FlowId) -> Result<bool, Error> {
self.flow_metadata_manager

View File

@@ -84,4 +84,11 @@ impl CacheInvalidator for MetasrvCacheInvalidator {
let instruction = Instruction::InvalidateCaches(caches.to_vec());
self.broadcast(ctx, instruction).await
}
fn invalidate_all(&self) -> MetaResult<()> {
// MetasrvCacheInvalidator only broadcasts concrete cache identifiers to
// remote nodes. The heartbeat instruction protocol has no global
// invalidate-all message, so there is no safe broadcast to send here.
Ok(())
}
}