From 486bb2ee8e1d0f810eacd361a633ed1a0ba30a42 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Wed, 10 May 2023 15:53:06 +0800 Subject: [PATCH] feat: Compress manifest and checkpoint (#1497) * feat: Compress manifest and checkpoint * refactor: use file extention infer compression type * chore: apply suggestions from CR * Update src/storage/src/manifest/storage.rs Co-authored-by: Yingwen * chore: CR advices * chore: Fix bugs, strengthen test * chore: Fix CR, strengthen test --------- Co-authored-by: dennis zhuang Co-authored-by: Yingwen --- Cargo.lock | 1 + src/common/datasource/Cargo.toml | 1 + src/common/datasource/src/compression.rs | 133 +++++++-- src/storage/src/error.rs | 27 ++ src/storage/src/manifest/storage.rs | 350 ++++++++++++++++++----- 5 files changed, 411 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fcfb0d990..a8f23d71c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1621,6 +1621,7 @@ dependencies = [ "derive_builder 0.12.0", "futures", "object-store", + "paste", "regex", "snafu", "tokio", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index d57c83faa9..988bd6fb0d 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -29,6 +29,7 @@ snafu.workspace = true tokio.workspace = true tokio-util.workspace = true url = "2.3" +paste = "1.0" [dev-dependencies] common-test-util = { path = "../test-util" } diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index fcf21f5db7..bc840cd6a8 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -17,9 +17,10 @@ use std::io; use std::str::FromStr; use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder}; +use async_compression::tokio::write; use bytes::Bytes; use futures::Stream; -use tokio::io::{AsyncRead, BufReader}; +use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use crate::error::{self, Error, Result}; @@ -73,37 +74,107 @@ impl CompressionType { !matches!(self, &Self::Uncompressed) } - pub fn convert_async_read( - &self, - s: T, - ) -> Box { + pub const fn file_extension(&self) -> &'static str { match self { - CompressionType::Gzip => Box::new(GzipDecoder::new(BufReader::new(s))), - CompressionType::Bzip2 => Box::new(BzDecoder::new(BufReader::new(s))), - CompressionType::Xz => Box::new(XzDecoder::new(BufReader::new(s))), - CompressionType::Zstd => Box::new(ZstdDecoder::new(BufReader::new(s))), - CompressionType::Uncompressed => Box::new(s), - } - } - - pub fn convert_stream> + Unpin + Send + 'static>( - &self, - s: T, - ) -> Box> + Send + Unpin> { - match self { - CompressionType::Gzip => { - Box::new(ReaderStream::new(GzipDecoder::new(StreamReader::new(s)))) - } - CompressionType::Bzip2 => { - Box::new(ReaderStream::new(BzDecoder::new(StreamReader::new(s)))) - } - CompressionType::Xz => { - Box::new(ReaderStream::new(XzDecoder::new(StreamReader::new(s)))) - } - CompressionType::Zstd => { - Box::new(ReaderStream::new(ZstdDecoder::new(StreamReader::new(s)))) - } - CompressionType::Uncompressed => Box::new(s), + Self::Gzip => "gz", + Self::Bzip2 => "bz2", + Self::Xz => "xz", + Self::Zstd => "zst", + Self::Uncompressed => "", } } } + +macro_rules! impl_compression_type { + ($(($enum_item:ident, $prefix:ident)),*) => { + paste::item! { + impl CompressionType { + pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result> { + match self { + $( + CompressionType::$enum_item => { + let mut buffer = Vec::with_capacity(content.as_ref().len()); + let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer); + encoder.write_all(content.as_ref()).await?; + encoder.shutdown().await?; + Ok(buffer) + } + )* + CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), + } + } + + pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result> { + match self { + $( + CompressionType::$enum_item => { + let mut buffer = Vec::with_capacity(content.as_ref().len() * 2); + let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer); + encoder.write_all(content.as_ref()).await?; + encoder.shutdown().await?; + Ok(buffer) + } + )* + CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), + } + } + + pub fn convert_async_read( + &self, + s: T, + ) -> Box { + match self { + $(CompressionType::$enum_item => Box::new([<$prefix Decoder>]::new(BufReader::new(s))),)* + CompressionType::Uncompressed => Box::new(s), + } + } + + pub fn convert_stream> + Unpin + Send + 'static>( + &self, + s: T, + ) -> Box> + Send + Unpin> { + match self { + $(CompressionType::$enum_item => Box::new(ReaderStream::new([<$prefix Decoder>]::new(StreamReader::new(s)))),)* + CompressionType::Uncompressed => Box::new(s), + } + } + } + + #[cfg(test)] + mod tests { + use super::CompressionType; + + $( + #[tokio::test] + async fn []() { + let string = "foo_bar".as_bytes().to_vec(); + let compress = CompressionType::$enum_item + .encode(&string) + .await + .unwrap(); + let decompress = CompressionType::$enum_item + .decode(&compress) + .await + .unwrap(); + assert_eq!(decompress, string); + })* + + #[tokio::test] + async fn test_uncompression() { + let string = "foo_bar".as_bytes().to_vec(); + let compress = CompressionType::Uncompressed + .encode(&string) + .await + .unwrap(); + let decompress = CompressionType::Uncompressed + .decode(&compress) + .await + .unwrap(); + assert_eq!(decompress, string); + } + } + } + }; +} + +impl_compression_type!((Gzip, Gzip), (Bzip2, Bz), (Xz, Xz), (Zstd, Zstd)); diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 9bef856610..692b6bf346 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::io::Error as IoError; use std::str::Utf8Error; +use common_datasource::compression::CompressionType; use common_error::prelude::*; use common_runtime::error::Error as RuntimeError; use datatypes::arrow::error::ArrowError; @@ -83,6 +84,30 @@ pub enum Error { source: object_store::Error, }, + #[snafu(display( + "Fail to compress object by {}, path: {}, source: {}", + compress_type, + path, + source + ))] + CompressObject { + compress_type: CompressionType, + path: String, + source: std::io::Error, + }, + + #[snafu(display( + "Fail to decompress object by {}, path: {}, source: {}", + compress_type, + path, + source + ))] + DecompressObject { + compress_type: CompressionType, + path: String, + source: std::io::Error, + }, + #[snafu(display("Fail to list objects in path: {}, source: {}", path, source))] ListObjects { path: String, @@ -517,6 +542,8 @@ impl ErrorExt for Error { | DecodeArrow { .. } | EncodeArrow { .. } | ManifestCheckpoint { .. } + | CompressObject { .. } + | DecompressObject { .. } | ParseSchema { .. } => StatusCode::Unexpected, WriteParquet { .. } diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index bf350356bb..e5ce1487ce 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -14,8 +14,10 @@ use std::collections::HashMap; use std::iter::Iterator; +use std::str::FromStr; use async_trait::async_trait; +use common_datasource::compression::CompressionType; use common_telemetry::logging; use futures::TryStreamExt; use lazy_static::lazy_static; @@ -26,16 +28,21 @@ use snafu::{ensure, ResultExt}; use store_api::manifest::{LogIterator, ManifestLogStorage, ManifestVersion}; use crate::error::{ - DecodeJsonSnafu, DeleteObjectSnafu, EncodeJsonSnafu, Error, InvalidScanIndexSnafu, - ListObjectsSnafu, ReadObjectSnafu, Result, Utf8Snafu, WriteObjectSnafu, + CompressObjectSnafu, DecodeJsonSnafu, DecompressObjectSnafu, DeleteObjectSnafu, + EncodeJsonSnafu, Error, InvalidScanIndexSnafu, ListObjectsSnafu, ReadObjectSnafu, Result, + Utf8Snafu, WriteObjectSnafu, }; lazy_static! { - static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json$").unwrap(); + static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap(); static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap(); } const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint"; +const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Uncompressed; +/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed. +/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing. +const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; #[inline] pub fn delta_file(version: ManifestVersion) -> String { @@ -47,6 +54,15 @@ pub fn checkpoint_file(version: ManifestVersion) -> String { format!("{version:020}.checkpoint") } +#[inline] +pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String { + if compress_type == CompressionType::Uncompressed { + format!("{}{}", path, file) + } else { + format!("{}{}.{}", path, file, compress_type.file_extension()) + } +} + /// Return's the file manifest version from path /// /// # Panics @@ -57,6 +73,16 @@ pub fn file_version(path: &str) -> ManifestVersion { s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } +/// Return's the file compress algorithm by file extension. +/// +/// for example file +/// `00000000000000000000.json.gz` -> `CompressionType::GZIP` +#[inline] +pub fn file_compress_type(path: &str) -> CompressionType { + let s = path.rsplit('.').next().unwrap_or(""); + CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed) +} + #[inline] pub fn is_delta_file(file_name: &str) -> bool { DELTA_RE.is_match(file_name) @@ -79,12 +105,20 @@ impl LogIterator for ObjectStoreLogIterator { async fn next_log(&mut self) -> Result)>> { match self.iter.next() { Some((v, entry)) => { + let compress_type = file_compress_type(entry.name()); let bytes = self .object_store .read(entry.path()) .await .context(ReadObjectSnafu { path: entry.path() })?; - Ok(Some((v, bytes))) + let data = compress_type + .decode(bytes) + .await + .context(DecompressObjectSnafu { + compress_type, + path: entry.path(), + })?; + Ok(Some((v, data))) } None => Ok(None), } @@ -94,6 +128,7 @@ impl LogIterator for ObjectStoreLogIterator { #[derive(Clone, Debug)] pub struct ManifestObjectStore { object_store: ObjectStore, + compress_type: CompressionType, path: String, } @@ -101,25 +136,49 @@ impl ManifestObjectStore { pub fn new(path: &str, object_store: ObjectStore) -> Self { Self { object_store, + //TODO: make it configurable + compress_type: DEFAULT_MANIFEST_COMPRESSION_TYPE, path: util::normalize_dir(path), } } #[inline] + /// Returns the delta file path under the **current** compression algorithm fn delta_file_path(&self, version: ManifestVersion) -> String { - format!("{}{}", self.path, delta_file(version)) + gen_path(&self.path, &delta_file(version), self.compress_type) } #[inline] + /// Returns the checkpoint file path under the **current** compression algorithm fn checkpoint_file_path(&self, version: ManifestVersion) -> String { - format!("{}{}", self.path, checkpoint_file(version)) + gen_path(&self.path, &checkpoint_file(version), self.compress_type) } #[inline] + /// Returns the last checkpoint path, because the last checkpoint is not compressed, + /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore` fn last_checkpoint_path(&self) -> String { format!("{}{}", self.path, LAST_CHECKPOINT_FILE) } + /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`), + /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`) + async fn get_paths(&self, filter: F) -> Result> + where + F: Fn(Entry) -> Option, + { + let streamer = self + .object_store + .list(&self.path) + .await + .context(ListObjectsSnafu { path: &self.path })?; + streamer + .try_filter_map(|e| async { Ok(filter(e)) }) + .try_collect::>() + .await + .context(ListObjectsSnafu { path: &self.path }) + } + pub(crate) fn path(&self) -> &str { &self.path } @@ -158,29 +217,18 @@ impl ManifestLogStorage for ManifestObjectStore { ) -> Result { ensure!(start <= end, InvalidScanIndexSnafu { start, end }); - let streamer = self - .object_store - .list(&self.path) - .await - .context(ListObjectsSnafu { path: &self.path })?; - - let mut entries: Vec<(ManifestVersion, Entry)> = streamer - .try_filter_map(|e| async move { - let file_name = e.name(); + let mut entries: Vec<(ManifestVersion, Entry)> = self + .get_paths(|entry| { + let file_name = entry.name(); if is_delta_file(file_name) { let version = file_version(file_name); - if version >= start && version < end { - Ok(Some((version, e))) - } else { - Ok(None) + if start <= version && version < end { + return Some((version, entry)); } - } else { - Ok(None) } + None }) - .try_collect::>() - .await - .context(ListObjectsSnafu { path: &self.path })?; + .await?; entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); @@ -195,31 +243,20 @@ impl ManifestLogStorage for ManifestObjectStore { end: ManifestVersion, keep_last_checkpoint: bool, ) -> Result { - let streamer = self - .object_store - .list(&self.path) - .await - .context(ListObjectsSnafu { path: &self.path })?; - // Stores (entry, is_checkpoint, version) in a Vec. - let entries: Vec<_> = streamer - .try_filter_map(|e| async move { - let file_name = e.name(); + let entries: Vec<_> = self + .get_paths(|entry| { + let file_name = entry.name(); let is_checkpoint = is_checkpoint_file(file_name); if is_delta_file(file_name) || is_checkpoint_file(file_name) { let version = file_version(file_name); if version < end { - Ok(Some((e, is_checkpoint, version))) - } else { - Ok(None) + return Some((entry, is_checkpoint, version)); } - } else { - Ok(None) } + None }) - .try_collect::>() - .await - .context(ListObjectsSnafu { path: &self.path })?; + .await?; let checkpoint_version = if keep_last_checkpoint { // Note that the order of entries is unspecific. entries @@ -237,7 +274,6 @@ impl ManifestLogStorage for ManifestObjectStore { } else { None }; - let paths: Vec<_> = entries .iter() .filter(|(_e, is_checkpoint, version)| { @@ -279,19 +315,37 @@ impl ManifestLogStorage for ManifestObjectStore { async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); - logging::debug!("Save log to manifest storage, version: {}", version); - + let data = self + .compress_type + .encode(bytes) + .await + .context(CompressObjectSnafu { + compress_type: self.compress_type, + path: &path, + })?; self.object_store - .write(&path, bytes.to_vec()) + .write(&path, data) .await .context(WriteObjectSnafu { path }) } async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> { - let raw_paths = (start..end) - .map(|v| self.delta_file_path(v)) - .collect::>(); + ensure!(start <= end, InvalidScanIndexSnafu { start, end }); + + // Due to backward compatibility, it is possible that the user's log between start and end has not been compressed, + // so we need to delete the uncompressed file corresponding to that version, even if the uncompressed file in that version do not exist. + let mut paths = Vec::with_capacity(((end - start) * 2) as usize); + for version in start..end { + paths.push(raw_normalize_path(&self.delta_file_path(version))); + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + paths.push(raw_normalize_path(&gen_path( + &self.path, + &delta_file(version), + FALL_BACK_COMPRESS_TYPE, + ))); + } + } logging::debug!( "Deleting logs from manifest storage, start: {}, end: {}", @@ -299,16 +353,11 @@ impl ManifestLogStorage for ManifestObjectStore { end ); - let paths = raw_paths - .iter() - .map(|p| raw_normalize_path(p)) - .collect::>(); - self.object_store - .remove(paths) + .remove(paths.clone()) .await .with_context(|_| DeleteObjectSnafu { - path: raw_paths.join(","), + path: paths.join(","), })?; Ok(()) @@ -316,11 +365,20 @@ impl ManifestLogStorage for ManifestObjectStore { async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.checkpoint_file_path(version); + let data = self + .compress_type + .encode(bytes) + .await + .context(CompressObjectSnafu { + compress_type: self.compress_type, + path: &path, + })?; self.object_store - .write(&path, bytes.to_vec()) + .write(&path, data) .await .context(WriteObjectSnafu { path })?; + // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. let last_checkpoint_path = self.last_checkpoint_path(); let checkpoint_metadata = CheckpointMetadata { @@ -337,7 +395,6 @@ impl ManifestLogStorage for ManifestObjectStore { ); let bs = checkpoint_metadata.encode()?; - self.object_store .write(&last_checkpoint_path, bs.as_ref().to_vec()) .await @@ -353,27 +410,88 @@ impl ManifestLogStorage for ManifestObjectStore { version: ManifestVersion, ) -> Result)>> { let path = self.checkpoint_file_path(version); - match self.object_store.read(&path).await { - Ok(checkpoint) => Ok(Some((version, checkpoint))), - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e).context(ReadObjectSnafu { path }), - } + // Due to backward compatibility, it is possible that the user's checkpoint not compressed, + // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. + let checkpoint_data = + match self.object_store.read(&path).await { + Ok(checkpoint) => { + let decompress_data = self.compress_type.decode(checkpoint).await.context( + DecompressObjectSnafu { + compress_type: self.compress_type, + path, + }, + )?; + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + logging::debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, + fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path, + })?; + Ok(Some(decompress_data)) + } + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(ReadObjectSnafu { + path: &fall_back_path, + }), + } + } else { + Ok(None) + } + } else { + Err(e).context(ReadObjectSnafu { path: &path }) + } + } + }?; + Ok(checkpoint_data.map(|data| (version, data))) } async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> { - let path = self.checkpoint_file_path(version); + // Due to backward compatibility, it is possible that the user's checkpoint file has not been compressed, + // so we need to delete the uncompressed checkpoint file corresponding to that version, even if the uncompressed checkpoint file in that version do not exist. + let paths = if self.compress_type != FALL_BACK_COMPRESS_TYPE { + vec![ + raw_normalize_path(&self.checkpoint_file_path(version)), + raw_normalize_path(&gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + )), + ] + } else { + vec![raw_normalize_path(&self.checkpoint_file_path(version))] + }; + self.object_store - .delete(&path) + .remove(paths.clone()) .await - .context(DeleteObjectSnafu { path })?; + .context(DeleteObjectSnafu { + path: paths.join(","), + })?; Ok(()) } async fn load_last_checkpoint(&self) -> Result)>> { let last_checkpoint_path = self.last_checkpoint_path(); - let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { - Ok(last_checkpoint_data) => last_checkpoint_data, + Ok(data) => data, Err(e) if e.kind() == ErrorKind::NotFound => { return Ok(None); } @@ -404,16 +522,39 @@ mod tests { use super::*; - #[tokio::test] - async fn test_manifest_log_store() { + fn new_test_manifest_store() -> ManifestObjectStore { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_manifest_log_store"); let mut builder = Fs::default(); builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); + ManifestObjectStore::new("/", object_store) + } - let log_store = ManifestObjectStore::new("/", object_store); + #[test] + // Define this test mainly to prevent future unintentional changes may break the backward compatibility. + fn test_compress_file_path_generation() { + let path = "/foo/bar/"; + let version: ManifestVersion = 0; + let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip); + assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz") + } + #[tokio::test] + async fn test_manifest_log_store_uncompress() { + let mut log_store = new_test_manifest_store(); + log_store.compress_type = CompressionType::Uncompressed; + test_manifest_log_store_case(log_store).await; + } + + #[tokio::test] + async fn test_manifest_log_store_compress() { + let mut log_store = new_test_manifest_store(); + log_store.compress_type = CompressionType::Gzip; + test_manifest_log_store_case(log_store).await; + } + + async fn test_manifest_log_store_case(log_store: ManifestObjectStore) { for v in 0..5 { log_store .save(v, format!("hello, {v}").as_bytes()) @@ -477,4 +618,73 @@ mod tests { let mut it = log_store.scan(0, 11).await.unwrap(); assert!(it.next_log().await.unwrap().is_none()); } + + #[tokio::test] + // test ManifestObjectStore can read/delete previously uncompressed data correctly + async fn test_compress_backward_compatible() { + let mut log_store = new_test_manifest_store(); + + // write uncompress data to stimulate previously uncompressed data + log_store.compress_type = CompressionType::Uncompressed; + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) + .await + .unwrap(); + + // change compress type + log_store.compress_type = CompressionType::Gzip; + + // test load_last_checkpoint work correctly for previously uncompressed data + let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); + assert_eq!(v, 5); + assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); + + // write compressed data to stimulate compress alogorithom take effect + for v in 5..10 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(10, "checkpoint_compressed".as_bytes()) + .await + .unwrap(); + + // test data reading + let mut it = log_store.scan(0, 10).await.unwrap(); + for v in 0..10 { + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(v, version); + assert_eq!(format!("hello, {v}").as_bytes(), bytes); + } + let (v, checkpoint) = log_store.load_checkpoint(5).await.unwrap().unwrap(); + assert_eq!(v, 5); + assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); + let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); + assert_eq!(v, 10); + assert_eq!(checkpoint, "checkpoint_compressed".as_bytes()); + + // Delete previously uncompressed checkpoint + log_store.delete_checkpoint(5).await.unwrap(); + assert!(log_store.load_checkpoint(5).await.unwrap().is_none()); + + // Delete [3, 7), contain uncompressed/compressed data + log_store.delete(3, 7).await.unwrap(); + // [3, 7) deleted + let mut it = log_store.scan(3, 7).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); + + // Delete util 10, contain uncompressed/compressed data + // log 0, 1, 2, 7, 8, 9 will be delete + assert_eq!(6, log_store.delete_until(10, false).await.unwrap()); + let mut it = log_store.scan(0, 10).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); + } }