refactor: Move manifest manager lock to MitoRegion (#3689)

* feat: remove manager inner wip

* feat: put manifest lock in region

* feat: don't update manifest if manager is stopped

* chore: address CR comments
This commit is contained in:
Yingwen
2024-04-15 13:48:25 +08:00
committed by GitHub
parent 75d85f9915
commit 2f4726f7b5
10 changed files with 160 additions and 188 deletions

View File

@@ -345,7 +345,7 @@ async fn test_catchup_with_manifest_update() {
// Ensures the mutable is empty.
assert!(region.version().memtables.mutable.is_empty());
let manifest = region.manifest_manager.manifest().await;
let manifest = region.manifest_manager.read().await.manifest();
assert_eq!(manifest.manifest_version, 0);
let resp = follower_engine
@@ -361,7 +361,7 @@ async fn test_catchup_with_manifest_update() {
// The inner region was replaced. We must get it again.
let region = follower_engine.get_region(region_id).unwrap();
let manifest = region.manifest_manager.manifest().await;
let manifest = region.manifest_manager.read().await.manifest();
assert_eq!(manifest.manifest_version, 2);
assert!(!region.is_writable());

View File

@@ -578,6 +578,12 @@ pub enum Error {
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },
#[snafu(display("Region {} is stopped", region_id))]
RegionStopped {
region_id: RegionId,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -684,6 +690,7 @@ impl ErrorExt for Error {
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
}
}

View File

@@ -18,12 +18,11 @@ use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;
use tokio::sync::RwLock;
use crate::error::{self, Result};
use crate::error::{self, RegionStoppedSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
@@ -111,99 +110,18 @@ pub struct RegionManifestOptions {
/// ```
#[derive(Debug)]
pub struct RegionManifestManager {
inner: RwLock<RegionManifestManagerInner>,
}
impl RegionManifestManager {
/// Construct a region's manifest and persist it.
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
let inner = RegionManifestManagerInner::new(metadata, options).await?;
Ok(Self {
inner: RwLock::new(inner),
})
}
/// Open an existing manifest.
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
if let Some(inner) = RegionManifestManagerInner::open(options).await? {
Ok(Some(Self {
inner: RwLock::new(inner),
}))
} else {
Ok(None)
}
}
/// Stop background tasks gracefully.
pub async fn stop(&self) -> Result<()> {
let mut inner = self.inner.write().await;
inner.stop().await
}
/// Update the manifest. Return the current manifest version number.
pub async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["update"])
.start_timer();
let mut inner = self.inner.write().await;
inner.update(action_list).await
}
/// Retrieve the current [RegionManifest].
pub async fn manifest(&self) -> Arc<RegionManifest> {
let inner = self.inner.read().await;
inner.manifest.clone()
}
#[cfg(test)]
pub async fn store(&self) -> ManifestObjectStore {
let inner = self.inner.read().await;
inner.store.clone()
}
/// Returns total manifest size.
pub async fn manifest_usage(&self) -> u64 {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
/// Returns true if a newer version manifest file is found.
pub async fn has_update(&self) -> Result<bool> {
let inner = self.inner.read().await;
inner.has_update().await
}
}
#[cfg(test)]
impl RegionManifestManager {
pub(crate) async fn validate_manifest(
&self,
expect: &RegionMetadataRef,
last_version: ManifestVersion,
) {
let manifest = self.manifest().await;
assert_eq!(manifest.metadata, *expect);
let inner = self.inner.read().await;
assert_eq!(inner.manifest.manifest_version, inner.last_version);
assert_eq!(last_version, inner.last_version);
}
}
#[derive(Debug)]
pub(crate) struct RegionManifestManagerInner {
store: ManifestObjectStore,
options: RegionManifestOptions,
last_version: ManifestVersion,
/// The last version included in checkpoint file.
last_checkpoint_version: ManifestVersion,
manifest: Arc<RegionManifest>,
stopped: bool,
}
impl RegionManifestManagerInner {
/// Creates a new manifest.
async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
impl RegionManifestManager {
/// Constructs a region's manifest and persist it.
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
@@ -243,13 +161,14 @@ impl RegionManifestManagerInner {
last_version: version,
last_checkpoint_version: MIN_VERSION,
manifest: Arc::new(manifest),
stopped: false,
})
}
/// Opens an existing manifest.
///
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
.start_timer();
@@ -333,15 +252,29 @@ impl RegionManifestManagerInner {
last_version: version,
last_checkpoint_version,
manifest: Arc::new(manifest),
stopped: false,
}))
}
async fn stop(&mut self) -> Result<()> {
/// Stops the manager.
pub async fn stop(&mut self) -> Result<()> {
self.stopped = true;
Ok(())
}
/// Updates the manifest. Return the current manifest version number.
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
/// Updates the manifest. Returns the current manifest version number.
pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["update"])
.start_timer();
ensure!(
!self.stopped,
RegionStoppedSnafu {
region_id: self.manifest.metadata.region_id,
}
);
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
@@ -373,20 +306,56 @@ impl RegionManifestManagerInner {
Ok(version)
}
/// Retrieves the current [RegionManifest].
pub fn manifest(&self) -> Arc<RegionManifest> {
self.manifest.clone()
}
/// Returns total manifest size.
pub(crate) fn total_manifest_size(&self) -> u64 {
pub fn manifest_usage(&self) -> u64 {
self.store.total_manifest_size()
}
}
impl RegionManifestManagerInner {
/// Returns true if a newer version manifest file is found.
///
/// It is typically used in read-only regions to catch up with manifest.
/// It doesn't lock the manifest directory in the object store so the result
/// may be inaccurate if there are concurrent writes.
pub async fn has_update(&self) -> Result<bool> {
let last_version = self.last_version;
let streamer =
self.store
.manifest_lister()
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
})?;
let need_update = streamer
.try_any(|entry| async move {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if version > last_version {
return true;
}
}
false
})
.await
.context(error::OpenDalSnafu)?;
Ok(need_update)
}
/// Increases last version and returns the increased version.
fn increase_version(&mut self) -> ManifestVersion {
self.last_version += 1;
self.last_version
}
pub(crate) async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> {
async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> {
if version - self.last_checkpoint_version >= self.options.checkpoint_distance
&& self.options.checkpoint_distance != 0
{
@@ -493,36 +462,19 @@ impl RegionManifestManagerInner {
Ok(None)
}
}
}
/// Returns true if a newer version manifest file is found.
///
/// It is typically used in read-only regions to catch up with manifest.
pub(crate) async fn has_update(&self) -> Result<bool> {
let last_version = self.last_version;
#[cfg(test)]
impl RegionManifestManager {
fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
let manifest = self.manifest();
assert_eq!(manifest.metadata, *expect);
assert_eq!(self.manifest.manifest_version, self.last_version);
assert_eq!(last_version, self.last_version);
}
let streamer =
self.store
.manifest_lister()
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
})?;
let need_update = streamer
.try_any(|entry| async move {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if version > last_version {
return true;
}
}
false
})
.await
.context(error::OpenDalSnafu)?;
Ok(need_update)
pub fn store(&self) -> ManifestObjectStore {
self.store.clone()
}
}
@@ -551,7 +503,7 @@ mod test {
.unwrap()
.unwrap();
manager.validate_manifest(&metadata, 0).await;
manager.validate_manifest(&metadata, 0);
}
#[tokio::test]
@@ -566,7 +518,7 @@ mod test {
// Creates a manifest.
let metadata = Arc::new(basic_region_metadata());
let manager = env
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
@@ -581,14 +533,14 @@ mod test {
.unwrap()
.unwrap();
manager.validate_manifest(&metadata, 0).await;
manager.validate_manifest(&metadata, 0);
}
#[tokio::test]
async fn region_change_add_column() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let manager = env
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
@@ -609,7 +561,7 @@ mod test {
let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;
manager.validate_manifest(&new_metadata, 1);
// Reopen the manager.
manager.stop().await.unwrap();
@@ -618,7 +570,7 @@ mod test {
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 1).await;
manager.validate_manifest(&new_metadata, 1);
}
/// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
@@ -650,7 +602,7 @@ mod test {
let manifest_dir = format!("{}/manifest", data_home_path);
let manager = env
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
@@ -671,10 +623,10 @@ mod test {
let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;
manager.validate_manifest(&new_metadata, 1);
// get manifest size
let manifest_size = manager.manifest_usage().await;
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// update 10 times nop_action to trigger checkpoint
@@ -694,7 +646,7 @@ mod test {
}
// check manifest size again
let manifest_size = manager.manifest_usage().await;
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// Reopen the manager,
@@ -705,10 +657,10 @@ mod test {
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 11).await;
manager.validate_manifest(&new_metadata, 11);
// get manifest size again
let manifest_size = manager.manifest_usage().await;
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1312);
}
}

View File

@@ -23,7 +23,7 @@ use crate::error::Error::ChecksumMismatch;
use crate::manifest::action::{
RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestManagerInner};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::TestEnv;
@@ -66,7 +66,7 @@ fn nop_action() -> RegionMetaActionList {
#[tokio::test]
async fn manager_without_checkpoint() {
let (_env, manager) = build_manager(0, CompressionType::Uncompressed).await;
let (_env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
// apply 10 actions
for _ in 0..10 {
@@ -76,7 +76,6 @@ async fn manager_without_checkpoint() {
// no checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
@@ -99,7 +98,6 @@ async fn manager_without_checkpoint() {
expected.sort_unstable();
let mut paths = manager
.store()
.await
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
@@ -110,7 +108,7 @@ async fn manager_without_checkpoint() {
#[tokio::test]
async fn manager_with_checkpoint_distance_1() {
common_telemetry::init_default_ut_logging();
let (env, manager) = build_manager(1, CompressionType::Uncompressed).await;
let (env, mut manager) = build_manager(1, CompressionType::Uncompressed).await;
// apply 10 actions
for _ in 0..10 {
@@ -120,7 +118,6 @@ async fn manager_with_checkpoint_distance_1() {
// has checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
@@ -137,7 +134,6 @@ async fn manager_with_checkpoint_distance_1() {
expected.sort_unstable();
let mut paths = manager
.store()
.await
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
@@ -147,8 +143,7 @@ async fn manager_with_checkpoint_distance_1() {
// check content in `_last_checkpoint`
let raw_bytes = manager
.store()
.await
.read_file(&manager.store().await.last_checkpoint_path())
.read_file(&manager.store().last_checkpoint_path())
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
@@ -159,14 +154,14 @@ async fn manager_with_checkpoint_distance_1() {
// reopen the manager
manager.stop().await.unwrap();
let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await;
assert_eq!(10, manager.manifest().await.manifest_version);
assert_eq!(10, manager.manifest().manifest_version);
}
#[tokio::test]
async fn test_corrupted_data_causing_checksum_error() {
// Initialize manager
common_telemetry::init_default_ut_logging();
let (_env, manager) = build_manager(1, CompressionType::Uncompressed).await;
let (_env, mut manager) = build_manager(1, CompressionType::Uncompressed).await;
// Apply actions
for _ in 0..10 {
@@ -176,7 +171,6 @@ async fn test_corrupted_data_causing_checksum_error() {
// Check if there is a checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
@@ -185,8 +179,7 @@ async fn test_corrupted_data_causing_checksum_error() {
// Corrupt the last checkpoint data
let mut corrupted_bytes = manager
.store()
.await
.read_file(&manager.store().await.last_checkpoint_path())
.read_file(&manager.store().last_checkpoint_path())
.await
.unwrap();
corrupted_bytes[0] ^= 1;
@@ -194,13 +187,12 @@ async fn test_corrupted_data_causing_checksum_error() {
// Overwrite the latest checkpoint data
manager
.store()
.await
.write_last_checkpoint(9, &corrupted_bytes)
.await
.unwrap();
// Attempt to load the corrupted checkpoint
let load_corrupted_result = manager.store().await.load_last_checkpoint().await;
let load_corrupted_result = manager.store().load_last_checkpoint().await;
// Check if the result is an error and if it's of type VerifyChecksum
assert_matches!(load_corrupted_result, Err(ChecksumMismatch { .. }));
@@ -245,13 +237,13 @@ async fn generate_checkpoint_with_compression_types(
compress_type: CompressionType,
actions: Vec<RegionMetaActionList>,
) -> RegionCheckpoint {
let (_env, manager) = build_manager(1, compress_type).await;
let (_env, mut manager) = build_manager(1, compress_type).await;
for action in actions {
manager.update(action).await.unwrap();
}
RegionManifestManagerInner::last_checkpoint(&mut manager.store().await)
RegionManifestManager::last_checkpoint(&mut manager.store())
.await
.unwrap()
.unwrap()

View File

@@ -27,6 +27,7 @@ use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::RwLock as TokioRwLock;
use crate::access_layer::AccessLayerRef;
use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result};
@@ -74,7 +75,7 @@ pub(crate) struct MitoRegion {
/// SSTs accessor for this region.
pub(crate) access_layer: AccessLayerRef,
/// Manager to maintain manifest for this region.
pub(crate) manifest_manager: RegionManifestManager,
pub(crate) manifest_manager: TokioRwLock<RegionManifestManager>,
/// SST file purger.
pub(crate) file_purger: FilePurgerRef,
/// Wal options of this region.
@@ -94,7 +95,7 @@ pub(crate) type MitoRegionRef = Arc<MitoRegion>;
impl MitoRegion {
/// Stop background managers for this region.
pub(crate) async fn stop(&self) -> Result<()> {
self.manifest_manager.stop().await?;
self.manifest_manager.write().await.stop().await?;
info!(
"Stopped region manifest manager, region_id: {}",
@@ -154,7 +155,7 @@ impl MitoRegion {
let wal_usage = self.estimated_wal_usage(memtable_usage);
let manifest_usage = self.manifest_manager.manifest_usage().await;
let manifest_usage = self.manifest_manager.read().await.manifest_usage();
RegionUsage {
region_id,
@@ -178,6 +179,8 @@ impl MitoRegion {
info!("Applying {edit:?} to region {}", self.region_id);
self.manifest_manager
.write()
.await
.update(RegionMetaActionList::with_action(RegionMetaAction::Edit(
edit.clone(),
)))

View File

@@ -27,6 +27,7 @@ use snafu::{ensure, OptionExt};
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{ColumnId, RegionId};
use tokio::sync::RwLock;
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
@@ -202,7 +203,7 @@ impl RegionOpener {
region_id,
version_control,
access_layer: access_layer.clone(),
manifest_manager,
manifest_manager: RwLock::new(manifest_manager),
file_purger: Arc::new(LocalFilePurger::new(
self.purge_scheduler,
access_layer,
@@ -263,7 +264,7 @@ impl RegionOpener {
return Ok(None);
};
let manifest = manifest_manager.manifest().await;
let manifest = manifest_manager.manifest();
let metadata = manifest.metadata.clone();
let region_id = self.region_id;
@@ -330,7 +331,7 @@ impl RegionOpener {
region_id: self.region_id,
version_control,
access_layer,
manifest_manager,
manifest_manager: RwLock::new(manifest_manager),
file_purger,
wal_options,
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),

View File

@@ -138,7 +138,12 @@ async fn alter_region_schema(
metadata: new_meta.clone(),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
region.manifest_manager.update(action_list).await?;
region
.manifest_manager
.write()
.await
.update(action_list)
.await?;
// Apply the metadata to region's version.
region

View File

@@ -45,30 +45,31 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let is_mutable_empty = region.version().memtables.mutable.is_empty();
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_manager.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
let region =
if !is_mutable_empty || region.manifest_manager.read().await.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());
reopened_region
} else {
region
};
reopened_region
} else {
region
};
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");

View File

@@ -79,7 +79,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
if let Err(e) = region
.manifest_manager
.write()
.await
.update(action_list)
.await
{
error!(e; "Failed to update manifest, region: {}", region_id);
manifest_timer.stop_and_discard();
request.on_failure(e);

View File

@@ -44,7 +44,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
region.manifest_manager.update(action_list).await?;
region
.manifest_manager
.write()
.await
.update(action_list)
.await?;
// Notifies flush scheduler.
self.flush_scheduler.on_region_truncated(region_id);