From 29fc2ea9d83dadb0aee82f64b86023ec4689b15b Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 19 Dec 2023 15:12:30 +0900 Subject: [PATCH] feat: fetch manifests in concurrent (#2951) * feat: fetch manifests in concurrent * chore: set fetching manifest concurrency limit to 16 --- src/mito2/src/manifest/manager.rs | 13 +++- src/mito2/src/manifest/storage.rs | 117 +++++++++++++++++------------- 2 files changed, 76 insertions(+), 54 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index f3688d7dd3..613253dffa 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -269,8 +269,10 @@ impl RegionManifestManagerInner { }; // apply actions from storage - let mut action_iter = store.scan(version, MAX_VERSION).await?; - while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? { + let manifests = store.scan(version, MAX_VERSION).await?; + let manifests = store.fetch_manifests(&manifests).await?; + + for (manifest_version, raw_action_list) in manifests { let action_list = RegionMetaActionList::decode(&raw_action_list)?; // set manifest size after last checkpoint store.set_delta_file_size(manifest_version, raw_action_list.len() as u64); @@ -402,10 +404,13 @@ impl RegionManifestManagerInner { return Ok(None); } - let mut iter = self.store.scan(start_version, end_version).await?; + let manifests = self.store.scan(start_version, end_version).await?; let mut last_version = start_version; let mut compacted_actions = 0; - while let Some((version, raw_action_list)) = iter.next_log().await? { + + let manifests = self.store.fetch_manifests(&manifests).await?; + + for (version, raw_action_list) in manifests { let action_list = RegionMetaActionList::decode(&raw_action_list)?; for action in action_list.actions { match action { diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 3e8860155b..b470a1255a 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use common_datasource::compression::CompressionType; use common_telemetry::debug; +use futures::future::try_join_all; use futures::TryStreamExt; use lazy_static::lazy_static; use object_store::{util, Entry, ErrorKind, ObjectStore}; @@ -25,6 +26,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::manifest::ManifestVersion; +use tokio::sync::Semaphore; use crate::error::{ CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result, @@ -41,6 +43,7 @@ const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip /// Due to backward compatibility, it is possible that the user's manifest file has not been compressed. /// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing. const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; +const FETCH_MANIFEST_PARALLELISM: usize = 16; /// Returns the [CompressionType] according to whether to compress manifest files. #[inline] @@ -101,35 +104,6 @@ pub fn is_checkpoint_file(file_name: &str) -> bool { CHECKPOINT_RE.is_match(file_name) } -pub struct ObjectStoreLogIterator { - object_store: ObjectStore, - iter: Box + Send + Sync>, -} - -impl ObjectStoreLogIterator { - pub async fn next_log(&mut self) -> Result)>> { - match self.iter.next() { - Some((v, entry)) => { - let compress_type = file_compress_type(entry.name()); - let bytes = self - .object_store - .read(entry.path()) - .await - .context(OpenDalSnafu)?; - let data = compress_type - .decode(bytes) - .await - .context(DecompressObjectSnafu { - compress_type, - path: entry.path(), - })?; - Ok(Some((v, data))) - } - None => Ok(None), - } - } -} - /// Key to identify a manifest file. #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] enum FileKey { @@ -197,12 +171,12 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } - /// Scan the manifest files in the range of [start, end) and return the iterator. + /// Scan the manifest files in the range of [start, end) and return all manifest entries. pub async fn scan( &self, start: ManifestVersion, end: ManifestVersion, - ) -> Result { + ) -> Result> { ensure!(start <= end, InvalidScanIndexSnafu { start, end }); let mut entries: Vec<(ManifestVersion, Entry)> = self @@ -220,10 +194,38 @@ impl ManifestObjectStore { entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); - Ok(ObjectStoreLogIterator { - object_store: self.object_store.clone(), - iter: Box::new(entries.into_iter()), - }) + Ok(entries) + } + + /// Fetch all manifests in concurrent. + pub async fn fetch_manifests( + &self, + manifests: &[(u64, Entry)], + ) -> Result)>> { + // TODO(weny): Make it configurable. + let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM); + + let tasks = manifests.iter().map(|(v, entry)| async { + // Safety: semaphore must exist. + let _permit = semaphore.acquire().await.unwrap(); + + let compress_type = file_compress_type(entry.name()); + let bytes = self + .object_store + .read(entry.path()) + .await + .context(OpenDalSnafu)?; + let data = compress_type + .decode(bytes) + .await + .context(DecompressObjectSnafu { + compress_type, + path: entry.path(), + })?; + Ok((*v, data)) + }); + + try_join_all(tasks).await } /// Delete manifest files that version < end. @@ -556,21 +558,25 @@ mod tests { .unwrap(); } - let mut it = log_store.scan(1, 4).await.unwrap(); + let manifests = log_store.scan(1, 4).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); for v in 1..4 { - let (version, bytes) = it.next_log().await.unwrap().unwrap(); + let (version, bytes) = it.next().unwrap(); assert_eq!(v, version); assert_eq!(format!("hello, {v}").as_bytes(), bytes); } - assert!(it.next_log().await.unwrap().is_none()); + assert!(it.next().is_none()); - let mut it = log_store.scan(0, 11).await.unwrap(); + let manifests = log_store.scan(0, 11).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); for v in 0..5 { - let (version, bytes) = it.next_log().await.unwrap().unwrap(); + let (version, bytes) = it.next().unwrap(); assert_eq!(v, version); assert_eq!(format!("hello, {v}").as_bytes(), bytes); } - assert!(it.next_log().await.unwrap().is_none()); + assert!(it.next().is_none()); // test checkpoint assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); @@ -587,18 +593,24 @@ mod tests { let _ = log_store.delete_until(4, true).await.unwrap(); let _ = log_store.load_checkpoint(3).await.unwrap().unwrap(); let _ = log_store.load_last_checkpoint().await.unwrap().unwrap(); - let mut it = log_store.scan(0, 11).await.unwrap(); - let (version, bytes) = it.next_log().await.unwrap().unwrap(); + let manifests = log_store.scan(0, 11).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); + + let (version, bytes) = it.next().unwrap(); assert_eq!(4, version); assert_eq!("hello, 4".as_bytes(), bytes); - assert!(it.next_log().await.unwrap().is_none()); + assert!(it.next().is_none()); // delete all logs and checkpoints let _ = log_store.delete_until(11, false).await.unwrap(); assert!(log_store.load_checkpoint(3).await.unwrap().is_none()); assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); - let mut it = log_store.scan(0, 11).await.unwrap(); - assert!(it.next_log().await.unwrap().is_none()); + let manifests = log_store.scan(0, 11).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); + + assert!(it.next().is_none()); } #[tokio::test] @@ -640,9 +652,12 @@ mod tests { .unwrap(); // test data reading - let mut it = log_store.scan(0, 10).await.unwrap(); + let manifests = log_store.scan(0, 10).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); + for v in 0..10 { - let (version, bytes) = it.next_log().await.unwrap().unwrap(); + let (version, bytes) = it.next().unwrap(); assert_eq!(v, version); assert_eq!(format!("hello, {v}").as_bytes(), bytes); } @@ -656,8 +671,10 @@ mod tests { // Delete util 10, contain uncompressed/compressed data // log 0, 1, 2, 7, 8, 9 will be delete assert_eq!(11, log_store.delete_until(10, false).await.unwrap()); - let mut it = log_store.scan(0, 10).await.unwrap(); - assert!(it.next_log().await.unwrap().is_none()); + let manifests = log_store.scan(0, 10).await.unwrap(); + let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let mut it = manifests.into_iter(); + assert!(it.next().is_none()); } #[tokio::test]