mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 23:32:55 +00:00
perf(puffin): not to stage uncompressed blob (#4333)
* feat(puffin): not to stage blob Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * feat: back with compressed blob Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -371,9 +371,6 @@ pub struct InvertedIndexConfig {
|
||||
/// Memory threshold for performing an external sort during index creation.
|
||||
pub mem_threshold_on_create: MemoryThreshold,
|
||||
|
||||
/// Whether to compress the index data.
|
||||
pub compress: bool,
|
||||
|
||||
#[deprecated = "use [IndexConfig::aux_path] instead"]
|
||||
#[serde(skip_serializing)]
|
||||
pub intermediate_path: String,
|
||||
@@ -396,7 +393,6 @@ impl Default for InvertedIndexConfig {
|
||||
create_on_compaction: Mode::Auto,
|
||||
apply_on_query: Mode::Auto,
|
||||
mem_threshold_on_create: MemoryThreshold::Auto,
|
||||
compress: true,
|
||||
write_buffer_size: ReadableSize::mb(8),
|
||||
intermediate_path: String::new(),
|
||||
metadata_cache_size: ReadableSize::mb(32),
|
||||
|
||||
@@ -580,7 +580,7 @@ async fn test_region_usage() {
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
let region_stat = region.region_usage();
|
||||
assert_eq!(region_stat.sst_usage, 3026);
|
||||
assert_eq!(region_stat.sst_usage, 3010);
|
||||
|
||||
// region total usage
|
||||
// Some memtables may share items.
|
||||
|
||||
@@ -220,7 +220,6 @@ impl<'a> IndexerBuilder<'a> {
|
||||
self.intermediate_manager.clone(),
|
||||
self.inverted_index_config.mem_threshold_on_create(),
|
||||
segment_row_count,
|
||||
self.inverted_index_config.compress,
|
||||
&self.index_options.inverted_index.ignore_column_ids,
|
||||
);
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
|
||||
use index::inverted_index::create::sort_create::SortIndexCreator;
|
||||
use index::inverted_index::create::InvertedIndexCreator;
|
||||
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
|
||||
use puffin::blob_metadata::CompressionCodec;
|
||||
use puffin::puffin_manager::{PuffinWriter, PutOptions};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -71,9 +70,6 @@ pub struct SstIndexCreator {
|
||||
/// The memory usage of the index creator.
|
||||
memory_usage: Arc<AtomicUsize>,
|
||||
|
||||
/// Whether to compress the index data.
|
||||
compress: bool,
|
||||
|
||||
/// Ids of indexed columns.
|
||||
column_ids: HashSet<ColumnId>,
|
||||
}
|
||||
@@ -87,7 +83,6 @@ impl SstIndexCreator {
|
||||
intermediate_manager: IntermediateManager,
|
||||
memory_usage_threshold: Option<usize>,
|
||||
segment_row_count: NonZeroUsize,
|
||||
compress: bool,
|
||||
ignore_column_ids: &[ColumnId],
|
||||
) -> Self {
|
||||
let temp_file_provider = Arc::new(TempFileProvider::new(
|
||||
@@ -122,7 +117,6 @@ impl SstIndexCreator {
|
||||
stats: Statistics::default(),
|
||||
aborted: false,
|
||||
memory_usage,
|
||||
compress,
|
||||
column_ids,
|
||||
}
|
||||
}
|
||||
@@ -242,12 +236,9 @@ impl SstIndexCreator {
|
||||
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
|
||||
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
|
||||
|
||||
let put_options = PutOptions {
|
||||
compression: self.compress.then_some(CompressionCodec::Zstd),
|
||||
};
|
||||
let (index_finish, puffin_add_blob) = futures::join!(
|
||||
self.index_creator.finish(&mut index_writer),
|
||||
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options)
|
||||
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
|
||||
);
|
||||
|
||||
match (
|
||||
@@ -398,7 +389,6 @@ mod tests {
|
||||
intm_mgr,
|
||||
memory_threshold,
|
||||
NonZeroUsize::new(segment_row_count).unwrap(),
|
||||
false,
|
||||
&[],
|
||||
);
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
|
||||
use puffin::error::{self as puffin_error, Result as PuffinResult};
|
||||
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
|
||||
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
|
||||
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
|
||||
use puffin::puffin_manager::{BlobGuard, PuffinManager};
|
||||
use puffin::puffin_manager::stager::BoundedStager;
|
||||
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{PuffinInitStagerSnafu, Result};
|
||||
@@ -35,10 +35,11 @@ use crate::sst::index::store::{self, InstrumentedStore};
|
||||
type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>;
|
||||
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
|
||||
|
||||
pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
|
||||
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
|
||||
pub(crate) type SstPuffinManager =
|
||||
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
|
||||
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
|
||||
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
|
||||
pub(crate) type BlobReader = <<SstPuffinReader as PuffinReader>::Blob as BlobGuard>::Reader;
|
||||
|
||||
const STAGING_DIR: &str = "staging";
|
||||
|
||||
|
||||
@@ -61,6 +61,15 @@ impl<R> PuffinFileReader<R> {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts the reader into an owned blob reader.
|
||||
pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader<R> {
|
||||
PartialReader::new(
|
||||
self.source,
|
||||
blob_metadata.offset as _,
|
||||
blob_metadata.length as _,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
|
||||
|
||||
@@ -22,7 +22,6 @@ mod tests;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{AsyncRead, AsyncSeek};
|
||||
|
||||
use crate::blob_metadata::CompressionCodec;
|
||||
@@ -92,10 +91,11 @@ pub trait PuffinReader {
|
||||
|
||||
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
|
||||
/// Users should hold the `BlobGuard` until they are done with the blob data.
|
||||
#[async_trait]
|
||||
#[auto_impl::auto_impl(Arc)]
|
||||
pub trait BlobGuard {
|
||||
type Reader: AsyncRead + AsyncSeek + Unpin;
|
||||
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
|
||||
async fn reader(&self) -> Result<Self::Reader>;
|
||||
}
|
||||
|
||||
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::error::Result;
|
||||
#[async_trait]
|
||||
#[auto_impl::auto_impl(Arc)]
|
||||
pub trait PuffinFileAccessor: Send + Sync + 'static {
|
||||
type Reader: AsyncRead + AsyncSeek + Unpin + Send;
|
||||
type Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync;
|
||||
type Writer: AsyncWrite + Unpin + Send;
|
||||
|
||||
/// Opens a reader for the given puffin file.
|
||||
|
||||
@@ -46,7 +46,7 @@ impl<S, F> FsPuffinManager<S, F> {
|
||||
#[async_trait]
|
||||
impl<S, F> PuffinManager for FsPuffinManager<S, F>
|
||||
where
|
||||
S: Stager + Clone,
|
||||
S: Stager + Clone + 'static,
|
||||
F: PuffinFileAccessor + Clone,
|
||||
{
|
||||
type Reader = FsPuffinReader<S, F>;
|
||||
|
||||
@@ -12,23 +12,26 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use async_compression::futures::bufread::ZstdDecoder;
|
||||
use async_trait::async_trait;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::io::BufReader;
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::blob_metadata::CompressionCodec;
|
||||
use crate::blob_metadata::{BlobMetadata, CompressionCodec};
|
||||
use crate::error::{
|
||||
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
|
||||
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
|
||||
};
|
||||
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
|
||||
use crate::partial_reader::PartialReader;
|
||||
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
|
||||
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
|
||||
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
|
||||
use crate::puffin_manager::PuffinReader;
|
||||
use crate::puffin_manager::{BlobGuard, PuffinReader};
|
||||
|
||||
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
|
||||
pub struct FsPuffinReader<S, F> {
|
||||
@@ -55,25 +58,51 @@ impl<S, F> FsPuffinReader<S, F> {
|
||||
#[async_trait]
|
||||
impl<S, F> PuffinReader for FsPuffinReader<S, F>
|
||||
where
|
||||
S: Stager,
|
||||
S: Stager + 'static,
|
||||
F: PuffinFileAccessor + Clone,
|
||||
{
|
||||
type Blob = S::Blob;
|
||||
type Blob = Either<RandomReadBlob<F>, S::Blob>;
|
||||
type Dir = S::Dir;
|
||||
|
||||
async fn blob(&self, key: &str) -> Result<Self::Blob> {
|
||||
self.stager
|
||||
.get_blob(
|
||||
self.puffin_file_name.as_str(),
|
||||
key,
|
||||
Box::new(move |writer| {
|
||||
let accessor = self.puffin_file_accessor.clone();
|
||||
let puffin_file_name = self.puffin_file_name.clone();
|
||||
let key = key.to_string();
|
||||
Self::init_blob_to_cache(puffin_file_name, key, writer, accessor)
|
||||
}),
|
||||
)
|
||||
.await
|
||||
let reader = self
|
||||
.puffin_file_accessor
|
||||
.reader(&self.puffin_file_name)
|
||||
.await?;
|
||||
let mut file = PuffinFileReader::new(reader);
|
||||
|
||||
// TODO(zhongzc): cache the metadata.
|
||||
let metadata = file.metadata().await?;
|
||||
let blob_metadata = metadata
|
||||
.blobs
|
||||
.into_iter()
|
||||
.find(|m| m.blob_type == key)
|
||||
.context(BlobNotFoundSnafu { blob: key })?;
|
||||
|
||||
let blob = if blob_metadata.compression_codec.is_none() {
|
||||
// If the blob is not compressed, we can directly read it from the puffin file.
|
||||
Either::L(RandomReadBlob {
|
||||
file_name: self.puffin_file_name.clone(),
|
||||
accessor: self.puffin_file_accessor.clone(),
|
||||
blob_metadata,
|
||||
})
|
||||
} else {
|
||||
// If the blob is compressed, we need to decompress it into staging space before reading.
|
||||
let staged_blob = self
|
||||
.stager
|
||||
.get_blob(
|
||||
self.puffin_file_name.as_str(),
|
||||
key,
|
||||
Box::new(|writer| {
|
||||
Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Either::R(staged_blob)
|
||||
};
|
||||
|
||||
Ok(blob)
|
||||
}
|
||||
|
||||
async fn dir(&self, key: &str) -> Result<Self::Dir> {
|
||||
@@ -85,7 +114,12 @@ where
|
||||
let accessor = self.puffin_file_accessor.clone();
|
||||
let puffin_file_name = self.puffin_file_name.clone();
|
||||
let key = key.to_string();
|
||||
Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor)
|
||||
Box::pin(Self::init_dir_to_stager(
|
||||
puffin_file_name,
|
||||
key,
|
||||
writer_provider,
|
||||
accessor,
|
||||
))
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -97,79 +131,63 @@ where
|
||||
S: Stager,
|
||||
F: PuffinFileAccessor,
|
||||
{
|
||||
fn init_blob_to_cache(
|
||||
puffin_file_name: String,
|
||||
key: String,
|
||||
async fn init_blob_to_stager(
|
||||
mut reader: PuffinFileReader<F::Reader>,
|
||||
blob_metadata: BlobMetadata,
|
||||
mut writer: BoxWriter,
|
||||
accessor: F,
|
||||
) -> BoxFuture<'static, Result<u64>> {
|
||||
Box::pin(async move {
|
||||
let reader = accessor.reader(&puffin_file_name).await?;
|
||||
let mut file = PuffinFileReader::new(reader);
|
||||
|
||||
let metadata = file.metadata().await?;
|
||||
let blob_metadata = metadata
|
||||
.blobs
|
||||
.iter()
|
||||
.find(|m| m.blob_type == key.as_str())
|
||||
.context(BlobNotFoundSnafu { blob: key })?;
|
||||
let reader = file.blob_reader(blob_metadata)?;
|
||||
|
||||
let compression = blob_metadata.compression_codec;
|
||||
let size = Self::handle_decompress(reader, &mut writer, compression).await?;
|
||||
|
||||
Ok(size)
|
||||
})
|
||||
) -> Result<u64> {
|
||||
let reader = reader.blob_reader(&blob_metadata)?;
|
||||
let compression = blob_metadata.compression_codec;
|
||||
let size = Self::handle_decompress(reader, &mut writer, compression).await?;
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn init_dir_to_cache(
|
||||
async fn init_dir_to_stager(
|
||||
puffin_file_name: String,
|
||||
key: String,
|
||||
writer_provider: DirWriterProviderRef,
|
||||
accessor: F,
|
||||
) -> BoxFuture<'static, Result<u64>> {
|
||||
Box::pin(async move {
|
||||
let reader = accessor.reader(&puffin_file_name).await?;
|
||||
let mut file = PuffinFileReader::new(reader);
|
||||
) -> Result<u64> {
|
||||
let reader = accessor.reader(&puffin_file_name).await?;
|
||||
let mut file = PuffinFileReader::new(reader);
|
||||
|
||||
let puffin_metadata = file.metadata().await?;
|
||||
let blob_metadata = puffin_metadata
|
||||
.blobs
|
||||
.iter()
|
||||
.find(|m| m.blob_type == key.as_str())
|
||||
.context(BlobNotFoundSnafu { blob: key })?;
|
||||
let puffin_metadata = file.metadata().await?;
|
||||
let blob_metadata = puffin_metadata
|
||||
.blobs
|
||||
.iter()
|
||||
.find(|m| m.blob_type == key.as_str())
|
||||
.context(BlobNotFoundSnafu { blob: key })?;
|
||||
|
||||
let mut reader = file.blob_reader(blob_metadata)?;
|
||||
let mut buf = vec![];
|
||||
reader.read_to_end(&mut buf).await.context(ReadSnafu)?;
|
||||
let dir_meta: DirMetadata =
|
||||
serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;
|
||||
let mut reader = file.blob_reader(blob_metadata)?;
|
||||
let mut buf = vec![];
|
||||
reader.read_to_end(&mut buf).await.context(ReadSnafu)?;
|
||||
let dir_meta: DirMetadata =
|
||||
serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;
|
||||
|
||||
let mut size = 0;
|
||||
for file_meta in dir_meta.files {
|
||||
let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
|
||||
BlobIndexOutOfBoundSnafu {
|
||||
index: file_meta.blob_index,
|
||||
max_index: puffin_metadata.blobs.len(),
|
||||
},
|
||||
)?;
|
||||
ensure!(
|
||||
blob_meta.blob_type == file_meta.key,
|
||||
FileKeyNotMatchSnafu {
|
||||
expected: file_meta.key,
|
||||
actual: &blob_meta.blob_type,
|
||||
}
|
||||
);
|
||||
let mut size = 0;
|
||||
for file_meta in dir_meta.files {
|
||||
let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
|
||||
BlobIndexOutOfBoundSnafu {
|
||||
index: file_meta.blob_index,
|
||||
max_index: puffin_metadata.blobs.len(),
|
||||
},
|
||||
)?;
|
||||
ensure!(
|
||||
blob_meta.blob_type == file_meta.key,
|
||||
FileKeyNotMatchSnafu {
|
||||
expected: file_meta.key,
|
||||
actual: &blob_meta.blob_type,
|
||||
}
|
||||
);
|
||||
|
||||
let reader = file.blob_reader(blob_meta)?;
|
||||
let writer = writer_provider.writer(&file_meta.relative_path).await?;
|
||||
let reader = file.blob_reader(blob_meta)?;
|
||||
let writer = writer_provider.writer(&file_meta.relative_path).await?;
|
||||
|
||||
let compression = blob_meta.compression_codec;
|
||||
size += Self::handle_decompress(reader, writer, compression).await?;
|
||||
}
|
||||
let compression = blob_meta.compression_codec;
|
||||
size += Self::handle_decompress(reader, writer, compression).await?;
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
})
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
/// Handles the decompression of the reader and writes the decompressed data to the writer.
|
||||
@@ -196,3 +214,87 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
|
||||
pub struct RandomReadBlob<F> {
|
||||
file_name: String,
|
||||
accessor: F,
|
||||
blob_metadata: BlobMetadata,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
|
||||
type Reader = PartialReader<F::Reader>;
|
||||
|
||||
async fn reader(&self) -> Result<Self::Reader> {
|
||||
ensure!(
|
||||
self.blob_metadata.compression_codec.is_none(),
|
||||
UnsupportedDecompressionSnafu {
|
||||
decompression: self.blob_metadata.compression_codec.unwrap().to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let reader = self.accessor.reader(&self.file_name).await?;
|
||||
let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
|
||||
Ok(blob_reader)
|
||||
}
|
||||
}
|
||||
|
||||
/// `Either` is a type that represents either `A` or `B`.
|
||||
///
|
||||
/// Used to:
|
||||
/// impl `AsyncRead + AsyncSeek` for `Either<A: AsyncRead + AsyncSeek, B: AsyncRead + AsyncSeek>`,
|
||||
/// impl `BlobGuard` for `Either<A: BlobGuard, B: BlobGuard>`.
|
||||
pub enum Either<A, B> {
|
||||
L(A),
|
||||
R(B),
|
||||
}
|
||||
|
||||
impl<A, B> AsyncRead for Either<A, B>
|
||||
where
|
||||
A: AsyncRead + Unpin,
|
||||
B: AsyncRead + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
match self.get_mut() {
|
||||
Either::L(a) => Pin::new(a).poll_read(cx, buf),
|
||||
Either::R(b) => Pin::new(b).poll_read(cx, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> AsyncSeek for Either<A, B>
|
||||
where
|
||||
A: AsyncSeek + Unpin,
|
||||
B: AsyncSeek + Unpin,
|
||||
{
|
||||
fn poll_seek(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
pos: std::io::SeekFrom,
|
||||
) -> Poll<std::io::Result<u64>> {
|
||||
match self.get_mut() {
|
||||
Either::L(a) => Pin::new(a).poll_seek(cx, pos),
|
||||
Either::R(b) => Pin::new(b).poll_seek(cx, pos),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<A, B> BlobGuard for Either<A, B>
|
||||
where
|
||||
A: BlobGuard + Sync,
|
||||
B: BlobGuard + Sync,
|
||||
{
|
||||
type Reader = Either<A::Reader, B::Reader>;
|
||||
async fn reader(&self) -> Result<Self::Reader> {
|
||||
match self {
|
||||
Either::L(a) => Ok(Either::L(a.reader().await?)),
|
||||
Either::R(b) => Ok(Either::R(b.reader().await?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,19 +42,19 @@ pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
|
||||
///
|
||||
/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
|
||||
/// can use to write the blob into the staging area.
|
||||
pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;
|
||||
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
|
||||
|
||||
/// Function that initializes a directory.
|
||||
///
|
||||
/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
|
||||
/// can use to write files inside the directory into the staging area.
|
||||
pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;
|
||||
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
|
||||
|
||||
/// `Stager` manages the staging area for the puffin files.
|
||||
#[async_trait]
|
||||
#[auto_impl::auto_impl(Arc)]
|
||||
pub trait Stager: Send + Sync {
|
||||
type Blob: BlobGuard;
|
||||
type Blob: BlobGuard + Sync;
|
||||
type Dir: DirGuard;
|
||||
|
||||
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
|
||||
|
||||
@@ -22,7 +22,6 @@ use async_walkdir::{Filtering, WalkDir};
|
||||
use base64::prelude::BASE64_URL_SAFE;
|
||||
use base64::Engine;
|
||||
use common_telemetry::{info, warn};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use moka::future::Cache;
|
||||
use sha2::{Digest, Sha256};
|
||||
@@ -128,7 +127,7 @@ impl Stager for BoundedStager {
|
||||
let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
|
||||
let path = self.base_dir.join(&file_name);
|
||||
|
||||
let size = Self::write_blob(&path, &init_fn).await?;
|
||||
let size = Self::write_blob(&path, init_fn).await?;
|
||||
|
||||
let guard = Arc::new(FsBlobGuard {
|
||||
path,
|
||||
@@ -163,7 +162,7 @@ impl Stager for BoundedStager {
|
||||
let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
|
||||
let path = self.base_dir.join(&dir_name);
|
||||
|
||||
let size = Self::write_dir(&path, &init_fn).await?;
|
||||
let size = Self::write_dir(&path, init_fn).await?;
|
||||
|
||||
let guard = Arc::new(FsDirGuard {
|
||||
path,
|
||||
@@ -225,7 +224,7 @@ impl BoundedStager {
|
||||
|
||||
async fn write_blob(
|
||||
target_path: &PathBuf,
|
||||
init_fn: &(dyn InitBlobFn + Send + Sync + '_),
|
||||
init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
|
||||
) -> Result<u64> {
|
||||
// To guarantee the atomicity of writing the file, we need to write
|
||||
// the file to a temporary file first...
|
||||
@@ -247,7 +246,7 @@ impl BoundedStager {
|
||||
|
||||
async fn write_dir(
|
||||
target_path: &PathBuf,
|
||||
init_fn: &(dyn InitDirFn + Send + Sync + '_),
|
||||
init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
|
||||
) -> Result<u64> {
|
||||
// To guarantee the atomicity of writing the directory, we need to write
|
||||
// the directory to a temporary directory first...
|
||||
@@ -425,16 +424,13 @@ pub struct FsBlobGuard {
|
||||
delete_queue: Sender<DeleteTask>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BlobGuard for FsBlobGuard {
|
||||
type Reader = Compat<fs::File>;
|
||||
|
||||
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
|
||||
let path = self.path.clone();
|
||||
async move {
|
||||
let file = fs::File::open(&path).await.context(OpenSnafu)?;
|
||||
Ok(file.compat())
|
||||
}
|
||||
.boxed()
|
||||
async fn reader(&self) -> Result<Self::Reader> {
|
||||
let file = fs::File::open(&self.path).await.context(OpenSnafu)?;
|
||||
Ok(file.compat())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -62,14 +62,30 @@ async fn test_put_get_file() {
|
||||
writer.finish().await.unwrap();
|
||||
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// renew cache manager
|
||||
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await;
|
||||
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
|
||||
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +122,15 @@ async fn test_put_get_files() {
|
||||
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
for (key, raw_data) in &blobs {
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// renew cache manager
|
||||
@@ -114,7 +138,15 @@ async fn test_put_get_files() {
|
||||
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
for (key, raw_data) in &blobs {
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -205,7 +237,15 @@ async fn test_put_get_mix_file_dir() {
|
||||
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
for (key, raw_data) in &blobs {
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
|
||||
|
||||
@@ -216,7 +256,15 @@ async fn test_put_get_mix_file_dir() {
|
||||
|
||||
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
|
||||
for (key, raw_data) in &blobs {
|
||||
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
|
||||
check_blob(
|
||||
puffin_file_name,
|
||||
key,
|
||||
raw_data,
|
||||
&stager,
|
||||
&reader,
|
||||
compression_codec.is_some(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
|
||||
}
|
||||
@@ -241,24 +289,28 @@ async fn put_blob(
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn check_blob<R>(
|
||||
async fn check_blob(
|
||||
puffin_file_name: &str,
|
||||
key: &str,
|
||||
raw_data: &[u8],
|
||||
stager: &BoundedStager,
|
||||
puffin_reader: &R,
|
||||
) where
|
||||
R: PuffinReader,
|
||||
{
|
||||
puffin_reader: &impl PuffinReader,
|
||||
compressed: bool,
|
||||
) {
|
||||
let blob = puffin_reader.blob(key).await.unwrap();
|
||||
let mut reader = blob.reader().await.unwrap();
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(buf, raw_data);
|
||||
|
||||
let mut cached_file = stager.must_get_file(puffin_file_name, key).await;
|
||||
if !compressed {
|
||||
// If the blob is not compressed, it won't be exist in the stager.
|
||||
return;
|
||||
}
|
||||
|
||||
let mut staged_file = stager.must_get_file(puffin_file_name, key).await;
|
||||
let mut buf = Vec::new();
|
||||
cached_file.read_to_end(&mut buf).await.unwrap();
|
||||
staged_file.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(buf, raw_data);
|
||||
}
|
||||
|
||||
@@ -291,15 +343,13 @@ async fn put_dir(
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn check_dir<R>(
|
||||
async fn check_dir(
|
||||
puffin_file_name: &str,
|
||||
key: &str,
|
||||
files_in_dir: &[(&str, &[u8])],
|
||||
stager: &BoundedStager,
|
||||
puffin_reader: &R,
|
||||
) where
|
||||
R: PuffinReader,
|
||||
{
|
||||
puffin_reader: &impl PuffinReader,
|
||||
) {
|
||||
let res_dir = puffin_reader.dir(key).await.unwrap();
|
||||
for (file_name, raw_data) in files_in_dir {
|
||||
let file_path = if cfg!(windows) {
|
||||
@@ -311,12 +361,12 @@ async fn check_dir<R>(
|
||||
assert_eq!(buf, *raw_data);
|
||||
}
|
||||
|
||||
let cached_dir = stager.must_get_dir(puffin_file_name, key).await;
|
||||
let staged_dir = stager.must_get_dir(puffin_file_name, key).await;
|
||||
for (file_name, raw_data) in files_in_dir {
|
||||
let file_path = if cfg!(windows) {
|
||||
cached_dir.as_path().join(file_name.replace('/', "\\"))
|
||||
staged_dir.as_path().join(file_name.replace('/', "\\"))
|
||||
} else {
|
||||
cached_dir.as_path().join(file_name)
|
||||
staged_dir.as_path().join(file_name)
|
||||
};
|
||||
let buf = std::fs::read(file_path).unwrap();
|
||||
assert_eq!(buf, *raw_data);
|
||||
|
||||
@@ -838,7 +838,6 @@ create_on_flush = "auto"
|
||||
create_on_compaction = "auto"
|
||||
apply_on_query = "auto"
|
||||
mem_threshold_on_create = "auto"
|
||||
compress = true
|
||||
metadata_cache_size = "32MiB"
|
||||
content_cache_size = "32MiB"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user