feat: introduce CustomizedRegionLeaseRenewer (#5762)

* feat: add manifest_version to `GrantedRegion`

* chore: upgrade proto

* chore: apply review suggestions

* chore: apply suggestions from CR

* feat: introduce `CustomizedRegionLeaseRenewerRef`

* chore: upgrade to `103948`
This commit is contained in:
Weny Xu
2025-03-31 21:25:05 +08:00
committed by GitHub
parent d3a60d8821
commit d701c18150
28 changed files with 268 additions and 60 deletions

2
Cargo.lock generated
View File

@@ -4671,7 +4671,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5bf34f1ba22763bfd4ab2ed1dd82fc790746048a#5bf34f1ba22763bfd4ab2ed1dd82fc790746048a"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=103948cbce833e1a17ee7083f5ba79564d08d6ec#103948cbce833e1a17ee7083f5ba79564d08d6ec"
dependencies = [
"prost 0.13.3",
"serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "103948cbce833e1a17ee7083f5ba79564d08d6ec" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -42,6 +42,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
@@ -670,6 +671,7 @@ impl StartCommand {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
@@ -787,6 +789,7 @@ impl InformationExtension for StandaloneInformationExtension {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
}
})
.collect::<Vec<_>>();

View File

@@ -92,6 +92,8 @@ pub struct RegionStat {
pub sst_size: u64,
/// The size of the SST index files in bytes.
pub index_size: u64,
/// The version of manifest.
pub manifest_version: u64,
}
impl Stat {
@@ -185,6 +187,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
}
}
}

View File

@@ -28,6 +28,7 @@ use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
@@ -164,6 +165,8 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.

View File

@@ -35,7 +35,9 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
/// [Control] indicated to the caller whether to go to the next step.
#[derive(Debug)]
@@ -250,6 +252,11 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
Ok(())
}
}

View File

@@ -850,6 +850,7 @@ mod tests {
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
@@ -893,6 +894,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),

View File

@@ -39,6 +39,7 @@ pub mod node_manager;
pub mod peer;
pub mod range_stream;
pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod state_store;

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;
/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
/// and the current manifest version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LeaderRegion {
pub datanode_id: u64,
pub manifest_version: u64,
}
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
/// Registry that maintains a mapping of all leader regions in the cluster.
/// Tracks which datanode is hosting the leader for each region and the corresponding
/// manifest version.
#[derive(Default)]
pub struct LeaderRegionRegistry {
inner: RwLock<HashMap<RegionId, LeaderRegion>>,
}
impl LeaderRegionRegistry {
/// Creates a new empty leader region registry.
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}
/// Gets the leader region for the given region ids.
pub fn batch_get<I: Iterator<Item = RegionId>>(
&self,
region_ids: I,
) -> HashMap<RegionId, LeaderRegion> {
let inner = self.inner.read().unwrap();
region_ids
.into_iter()
.flat_map(|region_id| {
inner
.get(&region_id)
.map(|leader_region| (region_id, *leader_region))
})
.collect::<HashMap<_, _>>()
}
/// Puts the leader regions into the registry.
pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
let mut inner = self.inner.write().unwrap();
for (region_id, leader_region) in key_values {
match inner.entry(region_id) {
Entry::Vacant(entry) => {
entry.insert(leader_region);
}
Entry::Occupied(mut entry) => {
let manifest_version = entry.get().manifest_version;
if manifest_version > leader_region.manifest_version {
warn!(
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
region_id,
manifest_version,
leader_region.manifest_version
);
} else {
entry.insert(leader_region);
}
}
}
}
}
pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
let mut inner = self.inner.write().unwrap();
for region_id in region_ids {
inner.remove(&region_id);
}
}
/// Resets the registry to an empty state.
pub fn reset(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

View File

@@ -35,6 +35,7 @@ use crate::node_manager::{
};
use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{DatanodeId, FlownodeId};
@@ -177,6 +178,7 @@ pub fn new_ddl_context_with_kv_backend(
node_manager,
cache_invalidator: Arc::new(DummyCacheInvalidator),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_allocator,
table_metadata_manager,
flow_metadata_allocator,

View File

@@ -472,7 +472,6 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
// TODO(weny): use real manifest version
manifest_version: 0,
}],
Instant::now() + Duration::from_millis(200),

View File

@@ -96,7 +96,7 @@ pub trait HeartbeatHandler: Send + Sync {
/// HandleControl
///
/// Controls process of handling heartbeat request.
#[derive(PartialEq)]
#[derive(PartialEq, Debug)]
pub enum HandleControl {
Continue,
Done,

View File

@@ -223,6 +223,7 @@ mod tests {
use common_meta::datanode::DatanodeStatKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use super::*;
@@ -257,6 +258,7 @@ mod tests {
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let handler = CollectStatsHandler::default();

View File

@@ -98,6 +98,7 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}
}
acc.stat = Some(Stat {

View File

@@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use store_api::region_engine::GrantedRegion;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
@@ -29,6 +31,17 @@ use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
}
pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
pub trait CustomizedRegionLeaseRenewer: Send + Sync {
fn renew(
&self,
ctx: &mut Context,
regions: HashMap<RegionId, RegionRole>,
) -> Vec<GrantedRegion>;
}
impl RegionLeaseHandler {
@@ -36,6 +49,7 @@ impl RegionLeaseHandler {
region_lease_seconds: u64,
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
) -> Self {
let region_lease_keeper =
RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
@@ -43,6 +57,7 @@ impl RegionLeaseHandler {
Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
customized_region_lease_renewer,
}
}
}
@@ -56,7 +71,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(stat) = acc.stat.as_ref() else {
@@ -74,18 +89,18 @@ impl HeartbeatHandler for RegionLeaseHandler {
.renew_region_leases(datanode_id, &regions)
.await?;
let renewed = renewed
.into_iter()
.map(|(region_id, region_role)| {
GrantedRegion {
region_id,
region_role,
// TODO(weny): use real manifest version
manifest_version: 0,
}
.into()
})
.collect::<Vec<_>>();
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_role)| GrantedRegion::new(region_id, region_role).into())
.collect::<Vec<_>>()
};
acc.region_lease = Some(RegionLease {
regions: renewed,
@@ -141,6 +156,7 @@ mod test {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}
}
@@ -200,6 +216,7 @@ mod test {
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
None,
);
handler.handle(&req, ctx, acc).await.unwrap();
@@ -342,6 +359,7 @@ mod test {
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
None,
);
handler.handle(&req, ctx, acc).await.unwrap();

View File

@@ -49,6 +49,7 @@ mod tests {
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_telemetry::tracing_context::W3cTrace;
@@ -84,6 +85,7 @@ mod tests {
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let req = HeartbeatRequest {

View File

@@ -36,6 +36,7 @@ use common_meta::leadership_notifier::{
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_options::datanode::DatanodeClientOptions;
use common_procedure::options::ProcedureConfig;
@@ -258,11 +259,13 @@ pub struct Context {
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub leader_region_registry: LeaderRegionRegistryRef,
}
impl Context {
pub fn reset_in_memory(&self) {
self.in_memory.reset();
self.leader_region_registry.reset();
}
}
@@ -403,6 +406,7 @@ pub struct Metasrv {
region_migration_manager: RegionMigrationManagerRef,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
plugins: Plugins,
}
@@ -668,6 +672,7 @@ impl Metasrv {
let election = self.election.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let cache_invalidator = self.cache_invalidator.clone();
let leader_region_registry = self.leader_region_registry.clone();
Context {
server_addr,
@@ -680,6 +685,7 @@ impl Metasrv {
is_infancy: false,
table_metadata_manager,
cache_invalidator,
leader_region_registry,
}
}
}

View File

@@ -34,6 +34,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::build_wal_options_allocator;
@@ -49,7 +50,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
@@ -325,12 +326,14 @@ impl MetasrvBuilder {
None
};
let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
leader_region_registry: leader_region_registry.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
@@ -343,6 +346,10 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);
let customized_region_lease_renewer = plugins
.as_ref()
.and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
let handler_group_builder = match handler_group_builder {
Some(handler_group_builder) => handler_group_builder,
None => {
@@ -350,6 +357,7 @@ impl MetasrvBuilder {
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
memory_region_keeper.clone(),
customized_region_lease_renewer,
);
HeartbeatHandlerGroupBuilder::new(pushers)
@@ -397,6 +405,7 @@ impl MetasrvBuilder {
region_migration_manager,
region_supervisor_ticker,
cache_invalidator,
leader_region_registry,
})
}
}

View File

@@ -123,6 +123,7 @@ pub mod test_data {
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
@@ -227,6 +228,7 @@ pub mod test_data {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
}
}

View File

@@ -189,6 +189,7 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}],
..Default::default()
}
@@ -210,6 +211,7 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}],
..Default::default()
}
@@ -231,6 +233,7 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}],
..Default::default()
}

View File

@@ -157,12 +157,16 @@ pub async fn open_compaction_region(
checkpoint_distance: mito_config.manifest_checkpoint_distance,
};
RegionManifestManager::open(region_manifest_options, Default::default())
.await?
.context(EmptyRegionDirSnafu {
region_id: req.region_id,
region_dir: req.region_dir.as_str(),
})?
RegionManifestManager::open(
region_manifest_options,
Default::default(),
Default::default(),
)
.await?
.context(EmptyRegionDirSnafu {
region_id: req.region_id,
region_dir: req.region_dir.as_str(),
})?
};
let manifest = manifest_manager.manifest();

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use common_datasource::compression::CompressionType;
@@ -23,6 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;
use super::storage::is_checkpoint_file;
use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
};
@@ -115,7 +116,7 @@ pub struct RegionManifestOptions {
#[derive(Debug)]
pub struct RegionManifestManager {
store: ManifestObjectStore,
last_version: ManifestVersion,
last_version: Arc<AtomicU64>,
checkpointer: Checkpointer,
manifest: Arc<RegionManifest>,
stopped: bool,
@@ -127,6 +128,7 @@ impl RegionManifestManager {
metadata: RegionMetadataRef,
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
@@ -164,9 +166,10 @@ impl RegionManifestManager {
store.save(version, &action_list.encode()?).await?;
let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
manifest_version.store(version, Ordering::Relaxed);
Ok(Self {
store,
last_version: version,
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
stopped: false,
@@ -179,6 +182,7 @@ impl RegionManifestManager {
pub async fn open(
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
) -> Result<Option<Self>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
@@ -263,9 +267,10 @@ impl RegionManifestManager {
store.clone(),
last_checkpoint_version,
);
manifest_version.store(version, Ordering::Relaxed);
Ok(Some(Self {
store,
last_version: version,
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
stopped: false,
@@ -290,13 +295,14 @@ impl RegionManifestManager {
.with_label_values(&["install_manifest_to"])
.start_timer();
let last_version = self.last_version();
// Case 1: If the target version is less than the current version, return the current version.
if self.last_version >= target_version {
if last_version >= target_version {
debug!(
"Target version {} is less than or equal to the current version {}, region: {}, skip install",
target_version, self.last_version, self.manifest.metadata.region_id
target_version, last_version, self.manifest.metadata.region_id
);
return Ok(self.last_version);
return Ok(last_version);
}
ensure!(
@@ -310,7 +316,7 @@ impl RegionManifestManager {
let mut manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(self.last_version + 1, target_version + 1)
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
.await?;
// Case 2: No manifests in range: [current_version+1, target_version+1)
@@ -322,7 +328,7 @@ impl RegionManifestManager {
if manifests.is_empty() {
debug!(
"Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
self.last_version, self.manifest.metadata.region_id
last_version, self.manifest.metadata.region_id
);
let last_version = self.install_last_checkpoint().await?;
// Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
@@ -341,14 +347,14 @@ impl RegionManifestManager {
if manifests.is_empty() {
return NoManifestsSnafu {
region_id: self.manifest.metadata.region_id,
start_version: self.last_version + 1,
start_version: last_version + 1,
end_version: target_version + 1,
last_version: self.last_version,
last_version,
}
.fail();
}
debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1);
debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
@@ -384,28 +390,29 @@ impl RegionManifestManager {
region_id: self.manifest.metadata.region_id,
target_version,
available_version: new_manifest.manifest_version,
last_version: self.last_version,
last_version,
}
);
let version = self.last_version;
let version = self.last_version();
self.manifest = Arc::new(new_manifest);
self.last_version = self.manifest.manifest_version;
let last_version = self.set_version(self.manifest.manifest_version);
info!(
"Install manifest changes from {} to {}, region: {}",
version, self.last_version, self.manifest.metadata.region_id
version, last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
Ok(last_version)
}
/// Installs the last checkpoint.
pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
let last_version = self.last_version();
let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
else {
return NoCheckpointSnafu {
region_id: self.manifest.metadata.region_id,
last_version: self.last_version,
last_version,
}
.fail();
};
@@ -414,14 +421,14 @@ impl RegionManifestManager {
.set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
let manifest = builder.try_build()?;
self.last_version = manifest.manifest_version;
let last_version = self.set_version(manifest.manifest_version);
self.manifest = Arc::new(manifest);
info!(
"Installed region manifest from checkpoint: {}, region: {}",
checkpoint.last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
Ok(last_version)
}
/// Updates the manifest. Returns the current manifest version number.
@@ -486,7 +493,7 @@ impl RegionManifestManager {
/// It doesn't lock the manifest directory in the object store so the result
/// may be inaccurate if there are concurrent writes.
pub async fn has_update(&self) -> Result<bool> {
let last_version = self.last_version;
let last_version = self.last_version();
let streamer =
self.store
@@ -499,7 +506,7 @@ impl RegionManifestManager {
let need_update = streamer
.try_any(|entry| async move {
let file_name = entry.name();
if is_delta_file(file_name) {
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version > last_version {
return true;
@@ -515,8 +522,18 @@ impl RegionManifestManager {
/// Increases last version and returns the increased version.
fn increase_version(&mut self) -> ManifestVersion {
self.last_version += 1;
self.last_version
let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
previous + 1
}
/// Sets the last version.
fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
self.last_version.store(version, Ordering::Relaxed);
version
}
fn last_version(&self) -> ManifestVersion {
self.last_version.load(Ordering::Relaxed)
}
/// Fetches the last [RegionCheckpoint] from storage.
@@ -547,8 +564,8 @@ impl RegionManifestManager {
fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
let manifest = self.manifest();
assert_eq!(manifest.metadata, *expect);
assert_eq!(self.manifest.manifest_version, self.last_version);
assert_eq!(last_version, self.last_version);
assert_eq!(self.manifest.manifest_version, self.last_version());
assert_eq!(last_version, self.last_version());
}
pub fn store(&self) -> ManifestObjectStore {

View File

@@ -289,6 +289,7 @@ impl MitoRegion {
let wal_usage = self.estimated_wal_usage(memtable_usage);
let manifest_usage = self.stats.total_manifest_size();
let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
let manifest_version = self.stats.manifest_version();
RegionStatistic {
num_rows,
@@ -297,6 +298,7 @@ impl MitoRegion {
manifest_size: manifest_usage,
sst_size: sst_usage,
index_size: index_usage,
manifest_version,
}
}
@@ -781,12 +783,17 @@ pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
#[derive(Default, Debug, Clone)]
pub(crate) struct ManifestStats {
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
}
impl ManifestStats {
fn total_manifest_size(&self) -> u64 {
self.total_manifest_size.load(Ordering::Relaxed)
}
fn manifest_version(&self) -> u64 {
self.manifest_version.load(Ordering::Relaxed)
}
}
#[cfg(test)]

View File

@@ -224,6 +224,7 @@ impl RegionOpener {
metadata.clone(),
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
)
.await?;
@@ -352,6 +353,7 @@ impl RegionOpener {
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
)
.await?
else {
@@ -529,9 +531,12 @@ impl RegionMetadataLoader {
region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
.await?
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await?
else {
return Ok(None);
};

View File

@@ -625,11 +625,16 @@ impl TestEnv {
};
if let Some(metadata) = initial_metadata {
RegionManifestManager::new(metadata, manifest_opts, Default::default())
.await
.map(Some)
RegionManifestManager::new(
metadata,
manifest_opts,
Default::default(),
Default::default(),
)
.await
.map(Some)
} else {
RegionManifestManager::open(manifest_opts, Default::default()).await
RegionManifestManager::open(manifest_opts, Default::default(), Default::default()).await
}
}

View File

@@ -121,6 +121,7 @@ impl SchedulerEnv {
checkpoint_distance: 10,
},
Default::default(),
Default::default(),
)
.await
.unwrap(),

View File

@@ -93,7 +93,6 @@ impl GrantedRegion {
Self {
region_id,
region_role,
// TODO(weny): use real manifest version
manifest_version: 0,
}
}
@@ -383,6 +382,9 @@ pub struct RegionStatistic {
/// The size of SST index files in bytes.
#[serde(default)]
pub index_size: u64,
/// The version of manifest.
#[serde(default)]
pub manifest_version: u64,
}
impl RegionStatistic {

View File

@@ -34,6 +34,7 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
@@ -217,6 +218,7 @@ impl GreptimeDbStandaloneBuilder {
node_manager: node_manager.clone(),
cache_invalidator: cache_registry.clone(),
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,