diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e9c85eea39..653cd7511f 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -154,6 +154,12 @@ impl RegionManifestManager { let inner = self.inner.read().await; inner.store.clone() } + + /// Returns total manifest size. + pub async fn manifest_size(&self) -> u64 { + let inner = self.inner.read().await; + inner.total_manifest_size() + } } #[cfg(test)] @@ -186,7 +192,7 @@ impl RegionManifestManagerInner { /// Creates a new manifest. async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { // construct storage - let store = ManifestObjectStore::new( + let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, @@ -232,7 +238,7 @@ impl RegionManifestManagerInner { /// Returns `Ok(None)` if no such manifest. async fn open(options: RegionManifestOptions) -> Result> { // construct storage - let store = ManifestObjectStore::new( + let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, @@ -240,8 +246,9 @@ impl RegionManifestManagerInner { // recover from storage // construct manifest builder + // calculate the manifest size from the latest checkpoint let mut version = MIN_VERSION; - let checkpoint = Self::last_checkpoint(&store).await?; + let checkpoint = Self::last_checkpoint(&mut store).await?; let last_checkpoint_version = checkpoint .as_ref() .map(|checkpoint| checkpoint.last_version) @@ -265,6 +272,8 @@ impl RegionManifestManagerInner { let mut action_iter = store.scan(version, MAX_VERSION).await?; while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? { let action_list = RegionMetaActionList::decode(&raw_action_list)?; + // set manifest size after last checkpoint + store.set_delta_file_size(manifest_version, raw_action_list.len() as u64); for action in action_list.actions { match action { RegionMetaAction::Change(action) => { @@ -312,6 +321,7 @@ impl RegionManifestManagerInner { Ok(()) } + /// Update the manifest. Return the current manifest version number. async fn update(&mut self, action_list: RegionMetaActionList) -> Result { let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; @@ -343,6 +353,11 @@ impl RegionManifestManagerInner { Ok(version) } + + /// Returns total manifest size. + pub(crate) fn total_manifest_size(&self) -> u64 { + self.store.total_manifest_size() + } } impl RegionManifestManagerInner { @@ -369,8 +384,8 @@ impl RegionManifestManagerInner { } /// Make a new checkpoint. Return the fresh one if there are some actions to compact. - async fn do_checkpoint(&self) -> Result> { - let last_checkpoint = Self::last_checkpoint(&self.store).await?; + async fn do_checkpoint(&mut self) -> Result> { + let last_checkpoint = Self::last_checkpoint(&mut self.store).await?; let current_version = self.last_version; let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint { @@ -441,7 +456,7 @@ impl RegionManifestManagerInner { /// Fetch the last [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( - store: &ManifestObjectStore, + store: &mut ManifestObjectStore, ) -> Result> { let last_checkpoint = store.load_last_checkpoint().await?; @@ -456,14 +471,16 @@ impl RegionManifestManagerInner { #[cfg(test)] mod test { + use api::v1::SemanticType; use common_datasource::compression::CompressionType; + use common_test_util::temp_dir::create_temp_dir; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use super::*; - use crate::manifest::action::RegionChange; + use crate::manifest::action::{RegionChange, RegionEdit}; use crate::manifest::tests::utils::basic_region_metadata; use crate::test_util::TestEnv; @@ -546,4 +563,95 @@ mod test { .unwrap(); manager.validate_manifest(&new_metadata, 1).await; } + + /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs. + async fn manifest_dir_usage(path: &str) -> u64 { + let mut size = 0; + let mut read_dir = tokio::fs::read_dir(path).await.unwrap(); + while let Ok(dir_entry) = read_dir.next_entry().await { + let Some(entry) = dir_entry else { + break; + }; + if entry.file_type().await.unwrap().is_file() { + let file_name = entry.file_name().into_string().unwrap(); + if file_name.contains(".checkpoint") || file_name.contains(".json") { + let file_size = entry.metadata().await.unwrap().len() as usize; + debug!("File: {file_name:?}, size: {file_size}"); + size += file_size; + } + } + } + size as u64 + } + + #[tokio::test] + async fn test_manifest_size() { + let metadata = Arc::new(basic_region_metadata()); + let data_home = create_temp_dir(""); + let data_home_path = data_home.path().to_str().unwrap().to_string(); + let env = TestEnv::with_data_home(data_home); + + let manifest_dir = format!("{}/manifest", data_home_path); + + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) + .await + .unwrap() + .unwrap(); + + let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone()); + new_metadata_builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 252, + }); + let new_metadata = Arc::new(new_metadata_builder.build().unwrap()); + + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: new_metadata.clone(), + })); + + let current_version = manager.update(action_list).await.unwrap(); + assert_eq!(current_version, 1); + manager.validate_manifest(&new_metadata, 1).await; + + // get manifest size + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); + + // update 10 times nop_action to trigger checkpoint + for _ in 0..10 { + manager + .update(RegionMetaActionList::new(vec![RegionMetaAction::Edit( + RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + )])) + .await + .unwrap(); + } + + // check manifest size again + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); + + // Reopen the manager, + // we just calculate the size from the latest checkpoint file + manager.stop().await.unwrap(); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, None) + .await + .unwrap() + .unwrap(); + manager.validate_manifest(&new_metadata, 11).await; + + // get manifest size again + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, 1312); + } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index a0f7dbf971..edd63ac521 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -129,11 +129,22 @@ impl ObjectStoreLogIterator { } } +/// Key to identify a manifest file. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +enum FileKey { + /// A delta file (`.json`). + Delta(ManifestVersion), + /// A checkpoint file (`.checkpoint`). + Checkpoint(ManifestVersion), +} + #[derive(Clone, Debug)] pub struct ManifestObjectStore { object_store: ObjectStore, compress_type: CompressionType, path: String, + /// Stores the size of each manifest file. + manifest_size_map: HashMap, } impl ManifestObjectStore { @@ -142,6 +153,7 @@ impl ManifestObjectStore { object_store, compress_type, path: util::normalize_dir(path), + manifest_size_map: HashMap::new(), } } @@ -184,6 +196,7 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } + /// Scan the manifest files in the range of [start, end) and return the iterator. pub async fn scan( &self, start: ManifestVersion, @@ -212,8 +225,12 @@ impl ManifestObjectStore { }) } + /// Delete manifest files that version < end. + /// If keep_last_checkpoint is true, the last checkpoint file will be kept. + /// ### Return + /// The number of deleted files. pub async fn delete_until( - &self, + &mut self, end: ManifestVersion, keep_last_checkpoint: bool, ) -> Result { @@ -248,7 +265,7 @@ impl ManifestObjectStore { } else { None }; - let paths: Vec<_> = entries + let del_entries: Vec<_> = entries .iter() .filter(|(_e, is_checkpoint, version)| { if let Some(max_version) = checkpoint_version { @@ -264,12 +281,15 @@ impl ManifestObjectStore { true } }) - .map(|e| e.0.path().to_string()) .collect(); + let paths = del_entries + .iter() + .map(|(e, _, _)| e.path().to_string()) + .collect::>(); let ret = paths.len(); debug!( - "Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}", + "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}", ret, self.path, end, @@ -282,10 +302,21 @@ impl ManifestObjectStore { .await .context(OpenDalSnafu)?; + // delete manifest sizes + for (_, is_checkpoint, version) in &del_entries { + if *is_checkpoint { + self.manifest_size_map + .remove(&FileKey::Checkpoint(*version)); + } else { + self.manifest_size_map.remove(&FileKey::Delta(*version)); + } + } + Ok(ret) } - pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + /// Save the delta manifest file. + pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); debug!("Save log to manifest storage, version: {}", version); let data = self @@ -296,13 +327,17 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; + let delta_size = data.len(); self.object_store .write(&path, data) .await - .context(OpenDalSnafu) + .context(OpenDalSnafu)?; + self.set_delta_file_size(version, delta_size as u64); + Ok(()) } - pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + /// Save the checkpoint manifest file. + pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.checkpoint_file_path(version); let data = self .compress_type @@ -312,10 +347,12 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; + let checkpoint_size = data.len(); self.object_store .write(&path, data) .await .context(OpenDalSnafu)?; + self.set_checkpoint_file_size(version, checkpoint_size as u64); // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. let last_checkpoint_path = self.last_checkpoint_path(); @@ -342,7 +379,7 @@ impl ManifestObjectStore { } pub async fn load_checkpoint( - &self, + &mut self, version: ManifestVersion, ) -> Result)>> { let path = self.checkpoint_file_path(version); @@ -351,12 +388,15 @@ impl ManifestObjectStore { let checkpoint_data = match self.object_store.read(&path).await { Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); let decompress_data = self.compress_type.decode(checkpoint).await.context( DecompressObjectSnafu { compress_type: self.compress_type, path, }, )?; + // set the checkpoint size + self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) } Err(e) => { @@ -373,6 +413,7 @@ impl ManifestObjectStore { ); match self.object_store.read(&fall_back_path).await { Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); let decompress_data = FALL_BACK_COMPRESS_TYPE .decode(checkpoint) .await @@ -380,6 +421,7 @@ impl ManifestObjectStore { compress_type: FALL_BACK_COMPRESS_TYPE, path, })?; + self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) } Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), @@ -398,7 +440,7 @@ impl ManifestObjectStore { /// Load the latest checkpoint. /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any - pub async fn load_last_checkpoint(&self) -> Result)>> { + pub async fn load_last_checkpoint(&mut self) -> Result)>> { let last_checkpoint_path = self.last_checkpoint_path(); let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { Ok(data) => data, @@ -424,6 +466,22 @@ impl ManifestObjectStore { pub async fn read_file(&self, path: &str) -> Result> { self.object_store.read(path).await.context(OpenDalSnafu) } + + /// Compute the size(Byte) in manifest size map. + pub(crate) fn total_manifest_size(&self) -> u64 { + self.manifest_size_map.values().sum() + } + + /// Set the size of the delta file by delta version. + pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { + self.manifest_size_map.insert(FileKey::Delta(version), size); + } + + /// Set the size of the checkpoint file by checkpoint version. + pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { + self.manifest_size_map + .insert(FileKey::Checkpoint(version), size); + } } #[derive(Serialize, Deserialize, Debug)] @@ -489,7 +547,7 @@ mod tests { test_manifest_log_store_case(log_store).await; } - async fn test_manifest_log_store_case(log_store: ManifestObjectStore) { + async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) { for v in 0..5 { log_store .save(v, format!("hello, {v}").as_bytes()) @@ -600,4 +658,92 @@ mod tests { let mut it = log_store.scan(0, 10).await.unwrap(); assert!(it.next_log().await.unwrap().is_none()); } + + #[tokio::test] + async fn test_file_version() { + let version = file_version("00000000000000000007.checkpoint"); + assert_eq!(version, 7); + + let name = delta_file(version); + assert_eq!(name, "00000000000000000007.json"); + + let name = checkpoint_file(version); + assert_eq!(name, "00000000000000000007.checkpoint"); + } + + #[tokio::test] + async fn test_uncompressed_manifest_files_size() { + let mut log_store = new_test_manifest_store(); + // write 5 manifest files with uncompressed(8B per file) + log_store.compress_type = CompressionType::Uncompressed; + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + // write 1 checkpoint file with uncompressed(23B) + log_store + .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) + .await + .unwrap(); + + // manifest files size + assert_eq!(log_store.total_manifest_size(), 63); + + // delete 3 manifest files + assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); + + // manifest files size after delete + assert_eq!(log_store.total_manifest_size(), 39); + + // delete all manifest files + assert_eq!( + log_store + .delete_until(ManifestVersion::MAX, false) + .await + .unwrap(), + 3 + ); + + assert_eq!(log_store.total_manifest_size(), 0); + } + + #[tokio::test] + async fn test_compressed_manifest_files_size() { + let mut log_store = new_test_manifest_store(); + // Test with compressed manifest files + log_store.compress_type = CompressionType::Gzip; + // write 5 manifest files + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(5, "checkpoint_compressed".as_bytes()) + .await + .unwrap(); + + // manifest files size + assert_eq!(log_store.total_manifest_size(), 181); + + // delete 3 manifest files + assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); + + // manifest files size after delete + assert_eq!(log_store.total_manifest_size(), 97); + + // delete all manifest files + assert_eq!( + log_store + .delete_until(ManifestVersion::MAX, false) + .await + .unwrap(), + 3 + ); + + assert_eq!(log_store.total_manifest_size(), 0); + } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 68c7063e1e..c28f6cd6d5 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -202,7 +202,7 @@ async fn generate_checkpoint_with_compression_types( manager.update(action).await.unwrap(); } - RegionManifestManagerInner::last_checkpoint(&manager.store().await) + RegionManifestManagerInner::last_checkpoint(&mut manager.store().await) .await .unwrap() .unwrap() diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c962124921..d7cb13e512 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -99,6 +99,15 @@ impl TestEnv { } } + /// Returns a new env with specific `data_home` for test. + pub fn with_data_home(data_home: TempDir) -> TestEnv { + TestEnv { + data_home, + logstore: None, + object_store: None, + } + } + pub fn get_logstore(&self) -> Option> { self.logstore.clone() }