feat: lookup manifest file size (#2590)

* feat: get manifest file size

* feat: manifest size statistics

* refactor: manifest map key

* chore: comment and unit test

* chore: remove no-use function

* chore: change style

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: cr comment

* chore: cr comment

* chore: cr comment

* chore: cr comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Wei
2023-10-23 16:59:00 +08:00
committed by GitHub
parent 44280f7c9d
commit fbc8f56eaa
4 changed files with 281 additions and 18 deletions

View File

@@ -154,6 +154,12 @@ impl RegionManifestManager {
let inner = self.inner.read().await;
inner.store.clone()
}
/// Returns total manifest size.
pub async fn manifest_size(&self) -> u64 {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
}
#[cfg(test)]
@@ -186,7 +192,7 @@ impl RegionManifestManagerInner {
/// Creates a new manifest.
async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
@@ -232,7 +238,7 @@ impl RegionManifestManagerInner {
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
@@ -240,8 +246,9 @@ impl RegionManifestManagerInner {
// recover from storage
// construct manifest builder
// calculate the manifest size from the latest checkpoint
let mut version = MIN_VERSION;
let checkpoint = Self::last_checkpoint(&store).await?;
let checkpoint = Self::last_checkpoint(&mut store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
@@ -265,6 +272,8 @@ impl RegionManifestManagerInner {
let mut action_iter = store.scan(version, MAX_VERSION).await?;
while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
// set manifest size after last checkpoint
store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
@@ -312,6 +321,7 @@ impl RegionManifestManagerInner {
Ok(())
}
/// Update the manifest. Return the current manifest version number.
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
@@ -343,6 +353,11 @@ impl RegionManifestManagerInner {
Ok(version)
}
/// Returns total manifest size.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.store.total_manifest_size()
}
}
impl RegionManifestManagerInner {
@@ -369,8 +384,8 @@ impl RegionManifestManagerInner {
}
/// Make a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&self.store).await?;
async fn do_checkpoint(&mut self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&mut self.store).await?;
let current_version = self.last_version;
let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint {
@@ -441,7 +456,7 @@ impl RegionManifestManagerInner {
/// Fetch the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &ManifestObjectStore,
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = store.load_last_checkpoint().await?;
@@ -456,14 +471,16 @@ impl RegionManifestManagerInner {
#[cfg(test)]
mod test {
use api::v1::SemanticType;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::manifest::action::RegionChange;
use crate::manifest::action::{RegionChange, RegionEdit};
use crate::manifest::tests::utils::basic_region_metadata;
use crate::test_util::TestEnv;
@@ -546,4 +563,95 @@ mod test {
.unwrap();
manager.validate_manifest(&new_metadata, 1).await;
}
/// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
async fn manifest_dir_usage(path: &str) -> u64 {
let mut size = 0;
let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
while let Ok(dir_entry) = read_dir.next_entry().await {
let Some(entry) = dir_entry else {
break;
};
if entry.file_type().await.unwrap().is_file() {
let file_name = entry.file_name().into_string().unwrap();
if file_name.contains(".checkpoint") || file_name.contains(".json") {
let file_size = entry.metadata().await.unwrap().len() as usize;
debug!("File: {file_name:?}, size: {file_size}");
size += file_size;
}
}
}
size as u64
}
#[tokio::test]
async fn test_manifest_size() {
let metadata = Arc::new(basic_region_metadata());
let data_home = create_temp_dir("");
let data_home_path = data_home.path().to_str().unwrap().to_string();
let env = TestEnv::with_data_home(data_home);
let manifest_dir = format!("{}/manifest", data_home_path);
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
}));
let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;
// get manifest size
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// update 10 times nop_action to trigger checkpoint
for _ in 0..10 {
manager
.update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
)]))
.await
.unwrap();
}
// check manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// Reopen the manager,
// we just calculate the size from the latest checkpoint file
manager.stop().await.unwrap();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 11).await;
// get manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, 1312);
}
}

View File

@@ -129,11 +129,22 @@ impl ObjectStoreLogIterator {
}
}
/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
enum FileKey {
/// A delta file (`.json`).
Delta(ManifestVersion),
/// A checkpoint file (`.checkpoint`).
Checkpoint(ManifestVersion),
}
#[derive(Clone, Debug)]
pub struct ManifestObjectStore {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
/// Stores the size of each manifest file.
manifest_size_map: HashMap<FileKey, u64>,
}
impl ManifestObjectStore {
@@ -142,6 +153,7 @@ impl ManifestObjectStore {
object_store,
compress_type,
path: util::normalize_dir(path),
manifest_size_map: HashMap::new(),
}
}
@@ -184,6 +196,7 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}
/// Scan the manifest files in the range of [start, end) and return the iterator.
pub async fn scan(
&self,
start: ManifestVersion,
@@ -212,8 +225,12 @@ impl ManifestObjectStore {
})
}
/// Delete manifest files that version < end.
/// If keep_last_checkpoint is true, the last checkpoint file will be kept.
/// ### Return
/// The number of deleted files.
pub async fn delete_until(
&self,
&mut self,
end: ManifestVersion,
keep_last_checkpoint: bool,
) -> Result<usize> {
@@ -248,7 +265,7 @@ impl ManifestObjectStore {
} else {
None
};
let paths: Vec<_> = entries
let del_entries: Vec<_> = entries
.iter()
.filter(|(_e, is_checkpoint, version)| {
if let Some(max_version) = checkpoint_version {
@@ -264,12 +281,15 @@ impl ManifestObjectStore {
true
}
})
.map(|e| e.0.path().to_string())
.collect();
let paths = del_entries
.iter()
.map(|(e, _, _)| e.path().to_string())
.collect::<Vec<_>>();
let ret = paths.len();
debug!(
"Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}",
"Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
ret,
self.path,
end,
@@ -282,10 +302,21 @@ impl ManifestObjectStore {
.await
.context(OpenDalSnafu)?;
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
if *is_checkpoint {
self.manifest_size_map
.remove(&FileKey::Checkpoint(*version));
} else {
self.manifest_size_map.remove(&FileKey::Delta(*version));
}
}
Ok(ret)
}
pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
/// Save the delta manifest file.
pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
debug!("Save log to manifest storage, version: {}", version);
let data = self
@@ -296,13 +327,17 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
let delta_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)
.context(OpenDalSnafu)?;
self.set_delta_file_size(version, delta_size as u64);
Ok(())
}
pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
/// Save the checkpoint manifest file.
pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
@@ -312,10 +347,12 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
let last_checkpoint_path = self.last_checkpoint_path();
@@ -342,7 +379,7 @@ impl ManifestObjectStore {
}
pub async fn load_checkpoint(
&self,
&mut self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let path = self.checkpoint_file_path(version);
@@ -351,12 +388,15 @@ impl ManifestObjectStore {
let checkpoint_data =
match self.object_store.read(&path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = self.compress_type.decode(checkpoint).await.context(
DecompressObjectSnafu {
compress_type: self.compress_type,
path,
},
)?;
// set the checkpoint size
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) => {
@@ -373,6 +413,7 @@ impl ManifestObjectStore {
);
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
@@ -380,6 +421,7 @@ impl ManifestObjectStore {
compress_type: FALL_BACK_COMPRESS_TYPE,
path,
})?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
@@ -398,7 +440,7 @@ impl ManifestObjectStore {
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint_path = self.last_checkpoint_path();
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(data) => data,
@@ -424,6 +466,22 @@ impl ManifestObjectStore {
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store.read(path).await.context(OpenDalSnafu)
}
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.values().sum()
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(FileKey::Delta(version), size);
}
/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(FileKey::Checkpoint(version), size);
}
}
#[derive(Serialize, Deserialize, Debug)]
@@ -489,7 +547,7 @@ mod tests {
test_manifest_log_store_case(log_store).await;
}
async fn test_manifest_log_store_case(log_store: ManifestObjectStore) {
async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
@@ -600,4 +658,92 @@ mod tests {
let mut it = log_store.scan(0, 10).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
}
#[tokio::test]
async fn test_file_version() {
let version = file_version("00000000000000000007.checkpoint");
assert_eq!(version, 7);
let name = delta_file(version);
assert_eq!(name, "00000000000000000007.json");
let name = checkpoint_file(version);
assert_eq!(name, "00000000000000000007.checkpoint");
}
#[tokio::test]
async fn test_uncompressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// write 5 manifest files with uncompressed8B per file
log_store.compress_type = CompressionType::Uncompressed;
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.await
.unwrap();
}
// write 1 checkpoint file with uncompressed23B
log_store
.save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
.await
.unwrap();
// manifest files size
assert_eq!(log_store.total_manifest_size(), 63);
// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
// manifest files size after delete
assert_eq!(log_store.total_manifest_size(), 39);
// delete all manifest files
assert_eq!(
log_store
.delete_until(ManifestVersion::MAX, false)
.await
.unwrap(),
3
);
assert_eq!(log_store.total_manifest_size(), 0);
}
#[tokio::test]
async fn test_compressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// Test with compressed manifest files
log_store.compress_type = CompressionType::Gzip;
// write 5 manifest files
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.await
.unwrap();
}
log_store
.save_checkpoint(5, "checkpoint_compressed".as_bytes())
.await
.unwrap();
// manifest files size
assert_eq!(log_store.total_manifest_size(), 181);
// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
// manifest files size after delete
assert_eq!(log_store.total_manifest_size(), 97);
// delete all manifest files
assert_eq!(
log_store
.delete_until(ManifestVersion::MAX, false)
.await
.unwrap(),
3
);
assert_eq!(log_store.total_manifest_size(), 0);
}
}

View File

@@ -202,7 +202,7 @@ async fn generate_checkpoint_with_compression_types(
manager.update(action).await.unwrap();
}
RegionManifestManagerInner::last_checkpoint(&manager.store().await)
RegionManifestManagerInner::last_checkpoint(&mut manager.store().await)
.await
.unwrap()
.unwrap()

View File

@@ -99,6 +99,15 @@ impl TestEnv {
}
}
/// Returns a new env with specific `data_home` for test.
pub fn with_data_home(data_home: TempDir) -> TestEnv {
TestEnv {
data_home,
logstore: None,
object_store: None,
}
}
pub fn get_logstore(&self) -> Option<Arc<RaftEngineLogStore>> {
self.logstore.clone()
}