mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 21:10:38 +00:00
feat: Compress manifest and checkpoint (#1497)
* feat: Compress manifest and checkpoint * refactor: use file extention infer compression type * chore: apply suggestions from CR * Update src/storage/src/manifest/storage.rs Co-authored-by: Yingwen <realevenyag@gmail.com> * chore: CR advices * chore: Fix bugs, strengthen test * chore: Fix CR, strengthen test --------- Co-authored-by: dennis zhuang <killme2008@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1621,6 +1621,7 @@ dependencies = [
|
||||
"derive_builder 0.12.0",
|
||||
"futures",
|
||||
"object-store",
|
||||
"paste",
|
||||
"regex",
|
||||
"snafu",
|
||||
"tokio",
|
||||
|
||||
@@ -29,6 +29,7 @@ snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
url = "2.3"
|
||||
paste = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
common-test-util = { path = "../test-util" }
|
||||
|
||||
@@ -17,9 +17,10 @@ use std::io;
|
||||
use std::str::FromStr;
|
||||
|
||||
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};
|
||||
use async_compression::tokio::write;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use tokio::io::{AsyncRead, BufReader};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
|
||||
use tokio_util::io::{ReaderStream, StreamReader};
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
@@ -73,37 +74,107 @@ impl CompressionType {
|
||||
!matches!(self, &Self::Uncompressed)
|
||||
}
|
||||
|
||||
pub fn convert_async_read<T: AsyncRead + Unpin + Send + 'static>(
|
||||
&self,
|
||||
s: T,
|
||||
) -> Box<dyn AsyncRead + Unpin + Send> {
|
||||
pub const fn file_extension(&self) -> &'static str {
|
||||
match self {
|
||||
CompressionType::Gzip => Box::new(GzipDecoder::new(BufReader::new(s))),
|
||||
CompressionType::Bzip2 => Box::new(BzDecoder::new(BufReader::new(s))),
|
||||
CompressionType::Xz => Box::new(XzDecoder::new(BufReader::new(s))),
|
||||
CompressionType::Zstd => Box::new(ZstdDecoder::new(BufReader::new(s))),
|
||||
CompressionType::Uncompressed => Box::new(s),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_stream<T: Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static>(
|
||||
&self,
|
||||
s: T,
|
||||
) -> Box<dyn Stream<Item = io::Result<Bytes>> + Send + Unpin> {
|
||||
match self {
|
||||
CompressionType::Gzip => {
|
||||
Box::new(ReaderStream::new(GzipDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::Bzip2 => {
|
||||
Box::new(ReaderStream::new(BzDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::Xz => {
|
||||
Box::new(ReaderStream::new(XzDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::Zstd => {
|
||||
Box::new(ReaderStream::new(ZstdDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::Uncompressed => Box::new(s),
|
||||
Self::Gzip => "gz",
|
||||
Self::Bzip2 => "bz2",
|
||||
Self::Xz => "xz",
|
||||
Self::Zstd => "zst",
|
||||
Self::Uncompressed => "",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_compression_type {
|
||||
($(($enum_item:ident, $prefix:ident)),*) => {
|
||||
paste::item! {
|
||||
impl CompressionType {
|
||||
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
|
||||
match self {
|
||||
$(
|
||||
CompressionType::$enum_item => {
|
||||
let mut buffer = Vec::with_capacity(content.as_ref().len());
|
||||
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
|
||||
encoder.write_all(content.as_ref()).await?;
|
||||
encoder.shutdown().await?;
|
||||
Ok(buffer)
|
||||
}
|
||||
)*
|
||||
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
|
||||
match self {
|
||||
$(
|
||||
CompressionType::$enum_item => {
|
||||
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
|
||||
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
|
||||
encoder.write_all(content.as_ref()).await?;
|
||||
encoder.shutdown().await?;
|
||||
Ok(buffer)
|
||||
}
|
||||
)*
|
||||
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_async_read<T: AsyncRead + Unpin + Send + 'static>(
|
||||
&self,
|
||||
s: T,
|
||||
) -> Box<dyn AsyncRead + Unpin + Send> {
|
||||
match self {
|
||||
$(CompressionType::$enum_item => Box::new([<$prefix Decoder>]::new(BufReader::new(s))),)*
|
||||
CompressionType::Uncompressed => Box::new(s),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_stream<T: Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static>(
|
||||
&self,
|
||||
s: T,
|
||||
) -> Box<dyn Stream<Item = io::Result<Bytes>> + Send + Unpin> {
|
||||
match self {
|
||||
$(CompressionType::$enum_item => Box::new(ReaderStream::new([<$prefix Decoder>]::new(StreamReader::new(s)))),)*
|
||||
CompressionType::Uncompressed => Box::new(s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::CompressionType;
|
||||
|
||||
$(
|
||||
#[tokio::test]
|
||||
async fn [<test_ $enum_item:lower _compression>]() {
|
||||
let string = "foo_bar".as_bytes().to_vec();
|
||||
let compress = CompressionType::$enum_item
|
||||
.encode(&string)
|
||||
.await
|
||||
.unwrap();
|
||||
let decompress = CompressionType::$enum_item
|
||||
.decode(&compress)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(decompress, string);
|
||||
})*
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uncompression() {
|
||||
let string = "foo_bar".as_bytes().to_vec();
|
||||
let compress = CompressionType::Uncompressed
|
||||
.encode(&string)
|
||||
.await
|
||||
.unwrap();
|
||||
let decompress = CompressionType::Uncompressed
|
||||
.decode(&compress)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(decompress, string);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_compression_type!((Gzip, Gzip), (Bzip2, Bz), (Xz, Xz), (Zstd, Zstd));
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::any::Any;
|
||||
use std::io::Error as IoError;
|
||||
use std::str::Utf8Error;
|
||||
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_error::prelude::*;
|
||||
use common_runtime::error::Error as RuntimeError;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
@@ -83,6 +84,30 @@ pub enum Error {
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Fail to compress object by {}, path: {}, source: {}",
|
||||
compress_type,
|
||||
path,
|
||||
source
|
||||
))]
|
||||
CompressObject {
|
||||
compress_type: CompressionType,
|
||||
path: String,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Fail to decompress object by {}, path: {}, source: {}",
|
||||
compress_type,
|
||||
path,
|
||||
source
|
||||
))]
|
||||
DecompressObject {
|
||||
compress_type: CompressionType,
|
||||
path: String,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Fail to list objects in path: {}, source: {}", path, source))]
|
||||
ListObjects {
|
||||
path: String,
|
||||
@@ -517,6 +542,8 @@ impl ErrorExt for Error {
|
||||
| DecodeArrow { .. }
|
||||
| EncodeArrow { .. }
|
||||
| ManifestCheckpoint { .. }
|
||||
| CompressObject { .. }
|
||||
| DecompressObject { .. }
|
||||
| ParseSchema { .. } => StatusCode::Unexpected,
|
||||
|
||||
WriteParquet { .. }
|
||||
|
||||
@@ -14,8 +14,10 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::iter::Iterator;
|
||||
use std::str::FromStr;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_telemetry::logging;
|
||||
use futures::TryStreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
@@ -26,16 +28,21 @@ use snafu::{ensure, ResultExt};
|
||||
use store_api::manifest::{LogIterator, ManifestLogStorage, ManifestVersion};
|
||||
|
||||
use crate::error::{
|
||||
DecodeJsonSnafu, DeleteObjectSnafu, EncodeJsonSnafu, Error, InvalidScanIndexSnafu,
|
||||
ListObjectsSnafu, ReadObjectSnafu, Result, Utf8Snafu, WriteObjectSnafu,
|
||||
CompressObjectSnafu, DecodeJsonSnafu, DecompressObjectSnafu, DeleteObjectSnafu,
|
||||
EncodeJsonSnafu, Error, InvalidScanIndexSnafu, ListObjectsSnafu, ReadObjectSnafu, Result,
|
||||
Utf8Snafu, WriteObjectSnafu,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json$").unwrap();
|
||||
static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
|
||||
static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
|
||||
}
|
||||
|
||||
const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
|
||||
const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Uncompressed;
|
||||
/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
|
||||
/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
|
||||
const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
|
||||
|
||||
#[inline]
|
||||
pub fn delta_file(version: ManifestVersion) -> String {
|
||||
@@ -47,6 +54,15 @@ pub fn checkpoint_file(version: ManifestVersion) -> String {
|
||||
format!("{version:020}.checkpoint")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
|
||||
if compress_type == CompressionType::Uncompressed {
|
||||
format!("{}{}", path, file)
|
||||
} else {
|
||||
format!("{}{}.{}", path, file, compress_type.file_extension())
|
||||
}
|
||||
}
|
||||
|
||||
/// Return's the file manifest version from path
|
||||
///
|
||||
/// # Panics
|
||||
@@ -57,6 +73,16 @@ pub fn file_version(path: &str) -> ManifestVersion {
|
||||
s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
|
||||
}
|
||||
|
||||
/// Return's the file compress algorithm by file extension.
|
||||
///
|
||||
/// for example file
|
||||
/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
|
||||
#[inline]
|
||||
pub fn file_compress_type(path: &str) -> CompressionType {
|
||||
let s = path.rsplit('.').next().unwrap_or("");
|
||||
CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_delta_file(file_name: &str) -> bool {
|
||||
DELTA_RE.is_match(file_name)
|
||||
@@ -79,12 +105,20 @@ impl LogIterator for ObjectStoreLogIterator {
|
||||
async fn next_log(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
|
||||
match self.iter.next() {
|
||||
Some((v, entry)) => {
|
||||
let compress_type = file_compress_type(entry.name());
|
||||
let bytes = self
|
||||
.object_store
|
||||
.read(entry.path())
|
||||
.await
|
||||
.context(ReadObjectSnafu { path: entry.path() })?;
|
||||
Ok(Some((v, bytes)))
|
||||
let data = compress_type
|
||||
.decode(bytes)
|
||||
.await
|
||||
.context(DecompressObjectSnafu {
|
||||
compress_type,
|
||||
path: entry.path(),
|
||||
})?;
|
||||
Ok(Some((v, data)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
@@ -94,6 +128,7 @@ impl LogIterator for ObjectStoreLogIterator {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ManifestObjectStore {
|
||||
object_store: ObjectStore,
|
||||
compress_type: CompressionType,
|
||||
path: String,
|
||||
}
|
||||
|
||||
@@ -101,25 +136,49 @@ impl ManifestObjectStore {
|
||||
pub fn new(path: &str, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
object_store,
|
||||
//TODO: make it configurable
|
||||
compress_type: DEFAULT_MANIFEST_COMPRESSION_TYPE,
|
||||
path: util::normalize_dir(path),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the delta file path under the **current** compression algorithm
|
||||
fn delta_file_path(&self, version: ManifestVersion) -> String {
|
||||
format!("{}{}", self.path, delta_file(version))
|
||||
gen_path(&self.path, &delta_file(version), self.compress_type)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the checkpoint file path under the **current** compression algorithm
|
||||
fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
|
||||
format!("{}{}", self.path, checkpoint_file(version))
|
||||
gen_path(&self.path, &checkpoint_file(version), self.compress_type)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Returns the last checkpoint path, because the last checkpoint is not compressed,
|
||||
/// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
|
||||
fn last_checkpoint_path(&self) -> String {
|
||||
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
|
||||
}
|
||||
|
||||
/// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
|
||||
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
|
||||
async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
|
||||
where
|
||||
F: Fn(Entry) -> Option<R>,
|
||||
{
|
||||
let streamer = self
|
||||
.object_store
|
||||
.list(&self.path)
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })?;
|
||||
streamer
|
||||
.try_filter_map(|e| async { Ok(filter(e)) })
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })
|
||||
}
|
||||
|
||||
pub(crate) fn path(&self) -> &str {
|
||||
&self.path
|
||||
}
|
||||
@@ -158,29 +217,18 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
) -> Result<ObjectStoreLogIterator> {
|
||||
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
|
||||
|
||||
let streamer = self
|
||||
.object_store
|
||||
.list(&self.path)
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })?;
|
||||
|
||||
let mut entries: Vec<(ManifestVersion, Entry)> = streamer
|
||||
.try_filter_map(|e| async move {
|
||||
let file_name = e.name();
|
||||
let mut entries: Vec<(ManifestVersion, Entry)> = self
|
||||
.get_paths(|entry| {
|
||||
let file_name = entry.name();
|
||||
if is_delta_file(file_name) {
|
||||
let version = file_version(file_name);
|
||||
if version >= start && version < end {
|
||||
Ok(Some((version, e)))
|
||||
} else {
|
||||
Ok(None)
|
||||
if start <= version && version < end {
|
||||
return Some((version, entry));
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
None
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })?;
|
||||
.await?;
|
||||
|
||||
entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
|
||||
|
||||
@@ -195,31 +243,20 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
end: ManifestVersion,
|
||||
keep_last_checkpoint: bool,
|
||||
) -> Result<usize> {
|
||||
let streamer = self
|
||||
.object_store
|
||||
.list(&self.path)
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })?;
|
||||
|
||||
// Stores (entry, is_checkpoint, version) in a Vec.
|
||||
let entries: Vec<_> = streamer
|
||||
.try_filter_map(|e| async move {
|
||||
let file_name = e.name();
|
||||
let entries: Vec<_> = self
|
||||
.get_paths(|entry| {
|
||||
let file_name = entry.name();
|
||||
let is_checkpoint = is_checkpoint_file(file_name);
|
||||
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
|
||||
let version = file_version(file_name);
|
||||
if version < end {
|
||||
Ok(Some((e, is_checkpoint, version)))
|
||||
} else {
|
||||
Ok(None)
|
||||
return Some((entry, is_checkpoint, version));
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
None
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(ListObjectsSnafu { path: &self.path })?;
|
||||
.await?;
|
||||
let checkpoint_version = if keep_last_checkpoint {
|
||||
// Note that the order of entries is unspecific.
|
||||
entries
|
||||
@@ -237,7 +274,6 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let paths: Vec<_> = entries
|
||||
.iter()
|
||||
.filter(|(_e, is_checkpoint, version)| {
|
||||
@@ -279,19 +315,37 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
|
||||
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
|
||||
let path = self.delta_file_path(version);
|
||||
|
||||
logging::debug!("Save log to manifest storage, version: {}", version);
|
||||
|
||||
let data = self
|
||||
.compress_type
|
||||
.encode(bytes)
|
||||
.await
|
||||
.context(CompressObjectSnafu {
|
||||
compress_type: self.compress_type,
|
||||
path: &path,
|
||||
})?;
|
||||
self.object_store
|
||||
.write(&path, bytes.to_vec())
|
||||
.write(&path, data)
|
||||
.await
|
||||
.context(WriteObjectSnafu { path })
|
||||
}
|
||||
|
||||
async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> {
|
||||
let raw_paths = (start..end)
|
||||
.map(|v| self.delta_file_path(v))
|
||||
.collect::<Vec<_>>();
|
||||
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
|
||||
|
||||
// Due to backward compatibility, it is possible that the user's log between start and end has not been compressed,
|
||||
// so we need to delete the uncompressed file corresponding to that version, even if the uncompressed file in that version do not exist.
|
||||
let mut paths = Vec::with_capacity(((end - start) * 2) as usize);
|
||||
for version in start..end {
|
||||
paths.push(raw_normalize_path(&self.delta_file_path(version)));
|
||||
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
|
||||
paths.push(raw_normalize_path(&gen_path(
|
||||
&self.path,
|
||||
&delta_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
logging::debug!(
|
||||
"Deleting logs from manifest storage, start: {}, end: {}",
|
||||
@@ -299,16 +353,11 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
end
|
||||
);
|
||||
|
||||
let paths = raw_paths
|
||||
.iter()
|
||||
.map(|p| raw_normalize_path(p))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.object_store
|
||||
.remove(paths)
|
||||
.remove(paths.clone())
|
||||
.await
|
||||
.with_context(|_| DeleteObjectSnafu {
|
||||
path: raw_paths.join(","),
|
||||
path: paths.join(","),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
@@ -316,11 +365,20 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
|
||||
async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
|
||||
let path = self.checkpoint_file_path(version);
|
||||
let data = self
|
||||
.compress_type
|
||||
.encode(bytes)
|
||||
.await
|
||||
.context(CompressObjectSnafu {
|
||||
compress_type: self.compress_type,
|
||||
path: &path,
|
||||
})?;
|
||||
self.object_store
|
||||
.write(&path, bytes.to_vec())
|
||||
.write(&path, data)
|
||||
.await
|
||||
.context(WriteObjectSnafu { path })?;
|
||||
|
||||
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
|
||||
let last_checkpoint_path = self.last_checkpoint_path();
|
||||
|
||||
let checkpoint_metadata = CheckpointMetadata {
|
||||
@@ -337,7 +395,6 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
);
|
||||
|
||||
let bs = checkpoint_metadata.encode()?;
|
||||
|
||||
self.object_store
|
||||
.write(&last_checkpoint_path, bs.as_ref().to_vec())
|
||||
.await
|
||||
@@ -353,27 +410,88 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
version: ManifestVersion,
|
||||
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
|
||||
let path = self.checkpoint_file_path(version);
|
||||
match self.object_store.read(&path).await {
|
||||
Ok(checkpoint) => Ok(Some((version, checkpoint))),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e).context(ReadObjectSnafu { path }),
|
||||
}
|
||||
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
|
||||
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
|
||||
let checkpoint_data =
|
||||
match self.object_store.read(&path).await {
|
||||
Ok(checkpoint) => {
|
||||
let decompress_data = self.compress_type.decode(checkpoint).await.context(
|
||||
DecompressObjectSnafu {
|
||||
compress_type: self.compress_type,
|
||||
path,
|
||||
},
|
||||
)?;
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
|
||||
let fall_back_path = gen_path(
|
||||
&self.path,
|
||||
&checkpoint_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
);
|
||||
logging::debug!(
|
||||
"Failed to load checkpoint from path: {}, fall back to path: {}",
|
||||
path,
|
||||
fall_back_path
|
||||
);
|
||||
match self.object_store.read(&fall_back_path).await {
|
||||
Ok(checkpoint) => {
|
||||
let decompress_data = FALL_BACK_COMPRESS_TYPE
|
||||
.decode(checkpoint)
|
||||
.await
|
||||
.context(DecompressObjectSnafu {
|
||||
compress_type: FALL_BACK_COMPRESS_TYPE,
|
||||
path,
|
||||
})?;
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e).context(ReadObjectSnafu {
|
||||
path: &fall_back_path,
|
||||
}),
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Err(e).context(ReadObjectSnafu { path: &path })
|
||||
}
|
||||
}
|
||||
}?;
|
||||
Ok(checkpoint_data.map(|data| (version, data)))
|
||||
}
|
||||
|
||||
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> {
|
||||
let path = self.checkpoint_file_path(version);
|
||||
// Due to backward compatibility, it is possible that the user's checkpoint file has not been compressed,
|
||||
// so we need to delete the uncompressed checkpoint file corresponding to that version, even if the uncompressed checkpoint file in that version do not exist.
|
||||
let paths = if self.compress_type != FALL_BACK_COMPRESS_TYPE {
|
||||
vec![
|
||||
raw_normalize_path(&self.checkpoint_file_path(version)),
|
||||
raw_normalize_path(&gen_path(
|
||||
&self.path,
|
||||
&checkpoint_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
)),
|
||||
]
|
||||
} else {
|
||||
vec![raw_normalize_path(&self.checkpoint_file_path(version))]
|
||||
};
|
||||
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.remove(paths.clone())
|
||||
.await
|
||||
.context(DeleteObjectSnafu { path })?;
|
||||
.context(DeleteObjectSnafu {
|
||||
path: paths.join(","),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
|
||||
let last_checkpoint_path = self.last_checkpoint_path();
|
||||
|
||||
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
|
||||
Ok(last_checkpoint_data) => last_checkpoint_data,
|
||||
Ok(data) => data,
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -404,16 +522,39 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_log_store() {
|
||||
fn new_test_manifest_store() -> ManifestObjectStore {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let tmp_dir = create_temp_dir("test_manifest_log_store");
|
||||
let mut builder = Fs::default();
|
||||
builder.root(&tmp_dir.path().to_string_lossy());
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
ManifestObjectStore::new("/", object_store)
|
||||
}
|
||||
|
||||
let log_store = ManifestObjectStore::new("/", object_store);
|
||||
#[test]
|
||||
// Define this test mainly to prevent future unintentional changes may break the backward compatibility.
|
||||
fn test_compress_file_path_generation() {
|
||||
let path = "/foo/bar/";
|
||||
let version: ManifestVersion = 0;
|
||||
let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
|
||||
assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_log_store_uncompress() {
|
||||
let mut log_store = new_test_manifest_store();
|
||||
log_store.compress_type = CompressionType::Uncompressed;
|
||||
test_manifest_log_store_case(log_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_log_store_compress() {
|
||||
let mut log_store = new_test_manifest_store();
|
||||
log_store.compress_type = CompressionType::Gzip;
|
||||
test_manifest_log_store_case(log_store).await;
|
||||
}
|
||||
|
||||
async fn test_manifest_log_store_case(log_store: ManifestObjectStore) {
|
||||
for v in 0..5 {
|
||||
log_store
|
||||
.save(v, format!("hello, {v}").as_bytes())
|
||||
@@ -477,4 +618,73 @@ mod tests {
|
||||
let mut it = log_store.scan(0, 11).await.unwrap();
|
||||
assert!(it.next_log().await.unwrap().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// test ManifestObjectStore can read/delete previously uncompressed data correctly
|
||||
async fn test_compress_backward_compatible() {
|
||||
let mut log_store = new_test_manifest_store();
|
||||
|
||||
// write uncompress data to stimulate previously uncompressed data
|
||||
log_store.compress_type = CompressionType::Uncompressed;
|
||||
for v in 0..5 {
|
||||
log_store
|
||||
.save(v, format!("hello, {v}").as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
log_store
|
||||
.save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// change compress type
|
||||
log_store.compress_type = CompressionType::Gzip;
|
||||
|
||||
// test load_last_checkpoint work correctly for previously uncompressed data
|
||||
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
|
||||
assert_eq!(v, 5);
|
||||
assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
|
||||
|
||||
// write compressed data to stimulate compress alogorithom take effect
|
||||
for v in 5..10 {
|
||||
log_store
|
||||
.save(v, format!("hello, {v}").as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
log_store
|
||||
.save_checkpoint(10, "checkpoint_compressed".as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// test data reading
|
||||
let mut it = log_store.scan(0, 10).await.unwrap();
|
||||
for v in 0..10 {
|
||||
let (version, bytes) = it.next_log().await.unwrap().unwrap();
|
||||
assert_eq!(v, version);
|
||||
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
|
||||
}
|
||||
let (v, checkpoint) = log_store.load_checkpoint(5).await.unwrap().unwrap();
|
||||
assert_eq!(v, 5);
|
||||
assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
|
||||
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
|
||||
assert_eq!(v, 10);
|
||||
assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
|
||||
|
||||
// Delete previously uncompressed checkpoint
|
||||
log_store.delete_checkpoint(5).await.unwrap();
|
||||
assert!(log_store.load_checkpoint(5).await.unwrap().is_none());
|
||||
|
||||
// Delete [3, 7), contain uncompressed/compressed data
|
||||
log_store.delete(3, 7).await.unwrap();
|
||||
// [3, 7) deleted
|
||||
let mut it = log_store.scan(3, 7).await.unwrap();
|
||||
assert!(it.next_log().await.unwrap().is_none());
|
||||
|
||||
// Delete util 10, contain uncompressed/compressed data
|
||||
// log 0, 1, 2, 7, 8, 9 will be delete
|
||||
assert_eq!(6, log_store.delete_until(10, false).await.unwrap());
|
||||
let mut it = log_store.scan(0, 10).await.unwrap();
|
||||
assert!(it.next_log().await.unwrap().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user