feat(puffin): implement CachedPuffinWriter (#4203)

* feat(puffin): support lz4 compression for footer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(puffin): introduce puffin manager trait

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(puffin): implement CachedPuffinWriter

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-06-25 16:00:48 +08:00
committed by GitHub
parent b2f61aa1cf
commit 1e815dddf1
17 changed files with 498 additions and 35 deletions

88
Cargo.lock generated
View File

@@ -573,6 +573,18 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.3.15"
@@ -610,6 +622,17 @@ dependencies = [
"zstd-safe 7.1.0",
]
[[package]]
name = "async-fs"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebcd09b382f40fcd159c2d695175b2ae620ffa5f3bd6f664131efff4e8b9e04a"
dependencies = [
"async-lock",
"blocking",
"futures-lite",
]
[[package]]
name = "async-lock"
version = "3.4.0"
@@ -654,6 +677,12 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "async-task"
version = "4.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
[[package]]
name = "async-trait"
version = "0.1.80"
@@ -665,6 +694,17 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "async-walkdir"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20235b6899dd1cb74a9afac0abf5b4a20c0e500dd6537280f4096e1b9f14da20"
dependencies = [
"async-fs",
"futures-lite",
"thiserror",
]
[[package]]
name = "asynchronous-codec"
version = "0.7.0"
@@ -702,6 +742,12 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atty"
version = "0.2.14"
@@ -1020,6 +1066,19 @@ dependencies = [
"generic-array",
]
[[package]]
name = "blocking"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea"
dependencies = [
"async-channel 2.3.1",
"async-task",
"futures-io",
"futures-lite",
"piper",
]
[[package]]
name = "borsh"
version = "1.5.1"
@@ -4181,6 +4240,19 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]
[[package]]
name = "futures-macro"
version = "0.3.30"
@@ -6283,7 +6355,7 @@ version = "0.8.2"
dependencies = [
"api",
"aquamarine",
"async-channel",
"async-channel 1.9.0",
"async-stream",
"async-trait",
"bytes",
@@ -7794,6 +7866,17 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "piper"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391"
dependencies = [
"atomic-waker",
"fastrand",
"futures-io",
]
[[package]]
name = "pkcs1"
version = "0.3.3"
@@ -8377,7 +8460,9 @@ dependencies = [
name = "puffin"
version = "0.8.2"
dependencies = [
"async-compression 0.4.11",
"async-trait",
"async-walkdir",
"bitflags 2.5.0",
"common-error",
"common-macro",
@@ -8390,6 +8475,7 @@ dependencies = [
"snafu 0.8.3",
"tokio",
"tokio-util",
"uuid",
]
[[package]]

View File

@@ -77,7 +77,7 @@ impl Indexer {
/// Finish the index creation.
/// Returns the number of bytes written if success or None if failed.
pub async fn finish(&mut self) -> Option<usize> {
pub async fn finish(&mut self) -> Option<u64> {
if let Some(mut creator) = self.inner.take() {
match creator.finish().await {
Ok((row_count, byte_count)) => {

View File

@@ -208,8 +208,9 @@ mod tests {
puffin_writer
.add_blob(Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
data: Cursor::new(vec![]),
compressed_data: Cursor::new(vec![]),
properties: Default::default(),
compression_codec: None,
})
.await
.unwrap();
@@ -260,8 +261,9 @@ mod tests {
puffin_writer
.add_blob(Blob {
blob_type: "invalid_blob_type".to_string(),
data: Cursor::new(vec![]),
compressed_data: Cursor::new(vec![]),
properties: Default::default(),
compression_codec: None,
})
.await
.unwrap();

View File

@@ -54,7 +54,7 @@ const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
type ByteCount = usize;
type ByteCount = u64;
type RowCount = usize;
/// Creates SST index.
@@ -271,8 +271,9 @@ impl SstIndexCreator {
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let blob = Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
data: rx.compat(),
compressed_data: rx.compat(),
properties: HashMap::default(),
compression_codec: None,
};
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
@@ -292,7 +293,7 @@ impl SstIndexCreator {
.fail()?,
(Ok(_), e @ Err(_)) => e?,
(e @ Err(_), Ok(_)) => e?,
(e @ Err(_), Ok(_)) => e.map(|_| ())?,
_ => {}
}

View File

@@ -35,7 +35,7 @@ pub(crate) struct Statistics {
/// Number of rows in the index.
row_count: usize,
/// Number of bytes in the index.
byte_count: usize,
byte_count: u64,
}
impl Statistics {
@@ -63,7 +63,7 @@ impl Statistics {
}
/// Returns byte count.
pub fn byte_count(&self) -> usize {
pub fn byte_count(&self) -> u64 {
self.byte_count
}
}
@@ -112,7 +112,7 @@ impl<'a> TimerGuard<'a> {
}
/// Increases the byte count of the index creation statistics.
pub fn inc_byte_count(&mut self, n: usize) {
pub fn inc_byte_count(&mut self, n: u64) {
self.stats.byte_count += n;
}
}

View File

@@ -136,7 +136,7 @@ where
let index_size = self.indexer.finish().await;
let inverted_index_available = index_size.is_some();
let index_file_size = index_size.unwrap_or(0) as u64;
let index_file_size = index_size.unwrap_or(0);
if stats.num_rows == 0 {
return Ok(None);

View File

@@ -8,7 +8,9 @@ license.workspace = true
workspace = true
[dependencies]
async-compression = "0.4.11"
async-trait.workspace = true
async-walkdir = "2.0.0"
bitflags.workspace = true
common-error.workspace = true
common-macro.workspace = true
@@ -19,7 +21,6 @@ pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
[dev-dependencies]
tokio.workspace = true
tokio-util.workspace = true
uuid.workspace = true

View File

@@ -69,7 +69,7 @@ pub struct BlobMetadata {
}
/// Compression codec used to compress the blob
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionCodec {
/// Single [LZ4 compression frame](https://github.com/lz4/lz4/blob/77d1b93f72628af7bbde0243b4bba9205c3138d9/doc/lz4_Frame_format.md),

View File

@@ -64,6 +64,30 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to open"))]
Open {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to read metadata"))]
Metadata {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error while walking directory"))]
WalkDirError {
#[snafu(source)]
error: async_walkdir::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Magic not matched"))]
MagicNotMatched {
#[snafu(implicit)]
@@ -159,6 +183,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported compression: {codec}"))]
UnsupportedCompression {
codec: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Write to the same blob twice: {blob}"))]
DuplicateBlob {
blob: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -172,6 +210,8 @@ impl ErrorExt for Error {
| Write { .. }
| Flush { .. }
| Close { .. }
| Open { .. }
| Metadata { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
@@ -180,9 +220,14 @@ impl ErrorExt for Error {
| InvalidBlobOffset { .. }
| InvalidBlobAreaEnd { .. }
| Lz4Compression { .. }
| Lz4Decompression { .. } => StatusCode::Unexpected,
| Lz4Decompression { .. }
| WalkDirError { .. } => StatusCode::Unexpected,
UnsupportedDecompression { .. } => StatusCode::Unsupported,
UnsupportedCompression { .. } | UnsupportedDecompression { .. } => {
StatusCode::Unsupported
}
DuplicateBlob { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -19,6 +19,7 @@ use std::collections::HashMap;
use async_trait::async_trait;
use crate::blob_metadata::CompressionCodec;
use crate::error::Result;
pub use crate::file_format::writer::file::PuffinFileWriter;
@@ -30,7 +31,10 @@ pub struct Blob<R> {
pub blob_type: String,
/// The data of the blob
pub data: R,
pub compressed_data: R,
/// The codec used to compress the blob.
pub compression_codec: Option<CompressionCodec>,
/// The properties of the blob
pub properties: HashMap<String, String>,
@@ -45,10 +49,10 @@ pub trait PuffinSyncWriter {
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool);
/// Add a blob to the Puffin file
fn add_blob<R: std::io::Read>(&mut self, blob: Blob<R>) -> Result<()>;
fn add_blob<R: std::io::Read>(&mut self, blob: Blob<R>) -> Result<u64>;
/// Finish writing the Puffin file, returns the number of bytes written
fn finish(&mut self) -> Result<usize>;
fn finish(&mut self) -> Result<u64>;
}
/// The trait for writing Puffin files asynchronously
@@ -61,8 +65,8 @@ pub trait PuffinAsyncWriter {
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool);
/// Add a blob to the Puffin file
async fn add_blob<R: futures::AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()>;
async fn add_blob<R: futures::AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<u64>;
/// Finish writing the Puffin file, returns the number of bytes written
async fn finish(&mut self) -> Result<usize>;
async fn finish(&mut self) -> Result<u64>;
}

View File

@@ -75,28 +75,28 @@ impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
self.properties = properties;
}
fn add_blob<R: io::Read>(&mut self, mut blob: Blob<R>) -> Result<()> {
fn add_blob<R: io::Read>(&mut self, mut blob: Blob<R>) -> Result<u64> {
self.write_header_if_needed_sync()?;
let size = io::copy(&mut blob.data, &mut self.writer).context(WriteSnafu)?;
let size = io::copy(&mut blob.compressed_data, &mut self.writer).context(WriteSnafu)?;
let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
self.blob_metadata.push(blob_metadata);
self.written_bytes += size;
Ok(())
Ok(size)
}
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
self.footer_lz4_compressed = lz4_compressed;
}
fn finish(&mut self) -> Result<usize> {
fn finish(&mut self) -> Result<u64> {
self.write_header_if_needed_sync()?;
self.write_footer_sync()?;
self.writer.flush().context(FlushSnafu)?;
Ok(self.written_bytes as usize)
Ok(self.written_bytes)
}
}
@@ -106,10 +106,10 @@ impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
self.properties = properties;
}
async fn add_blob<R: AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()> {
async fn add_blob<R: AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<u64> {
self.write_header_if_needed_async().await?;
let size = futures::io::copy(blob.data, &mut self.writer)
let size = futures::io::copy(blob.compressed_data, &mut self.writer)
.await
.context(WriteSnafu)?;
@@ -117,20 +117,20 @@ impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
self.blob_metadata.push(blob_metadata);
self.written_bytes += size;
Ok(())
Ok(size)
}
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
self.footer_lz4_compressed = lz4_compressed;
}
async fn finish(&mut self) -> Result<usize> {
async fn finish(&mut self) -> Result<u64> {
self.write_header_if_needed_async().await?;
self.write_footer_async().await?;
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)?;
Ok(self.written_bytes as usize)
Ok(self.written_bytes)
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(trait_alias)]
pub mod blob_metadata;
pub mod error;
pub mod file_format;

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod cache_manager;
pub mod cached_puffin_manager;
use std::path::PathBuf;
use async_trait::async_trait;
@@ -37,11 +40,14 @@ pub trait PuffinManager {
#[async_trait]
pub trait PuffinWriter {
/// Writes a blob associated with the specified `key` to the Puffin file.
/// Returns the number of bytes written.
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
where
R: AsyncRead + Send;
/// Writes a directory associated with the specified `key` to the Puffin file.
/// Returns the number of bytes written.
///
/// The specified `dir` should be accessible from the filesystem.
async fn put_dir(&mut self, key: &str, dir: PathBuf, options: PutOptions) -> Result<u64>;

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use crate::error::Result;
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
/// Result containing the number of bytes written (u64).
pub type WriteResult = BoxFuture<'static, Result<u64>>;
/// `DirWriterProvider` provides a way to write files into a directory.
#[async_trait]
pub trait DirWriterProvider {
/// Creates a writer for the given relative path.
async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
}
pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
/// Function that initializes a blob.
///
/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the cache.
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
/// Function that initializes a directory.
///
/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the cache.
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
/// `CacheManager` manages the cache for the puffin files.
#[async_trait]
pub trait CacheManager {
type Reader: AsyncRead + AsyncSeek;
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + 'a>,
) -> Result<Self::Reader>;
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitDirFn + Send + 'a>,
) -> Result<PathBuf>;
/// Stores a directory in the cache.
async fn put_dir(
&self,
puffin_file_name: &str,
key: &str,
dir_path: PathBuf,
dir_size: u64,
) -> Result<()>;
}
pub type CacheManagerRef<R> = Arc<dyn CacheManager<Reader = R> + Send + Sync>;

View File

@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod writer;
use serde::{Deserialize, Serialize};
pub use writer::CachedPuffinWriter;
/// Metadata for directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirMetadata {
pub files: Vec<DirFileMetadata>,
}
/// Metadata for file in directory in puffin file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirFileMetadata {
/// The relative path of the file in the directory.
pub relative_path: String,
/// The file is stored as a blob in the puffin file.
/// `blob_index` is the index of the blob in the puffin file.
pub blob_index: usize,
/// The key of the blob in the puffin file.
pub key: String,
}

View File

@@ -0,0 +1,193 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::path::PathBuf;
use async_compression::futures::bufread::ZstdEncoder;
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncSeek, AsyncWrite, StreamExt};
use snafu::{ensure, ResultExt};
use tokio_util::compat::TokioAsyncReadCompatExt;
use uuid::Uuid;
use crate::blob_metadata::CompressionCodec;
use crate::error::{
DuplicateBlobSnafu, MetadataSnafu, OpenSnafu, Result, SerializeJsonSnafu,
UnsupportedCompressionSnafu, WalkDirSnafu,
};
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use crate::puffin_manager::cache_manager::CacheManagerRef;
use crate::puffin_manager::cached_puffin_manager::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::{PuffinWriter, PutOptions};
/// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct CachedPuffinWriter<CR, W> {
/// The name of the puffin file.
puffin_file_name: String,
/// The cache manager.
cache_manager: CacheManagerRef<CR>,
/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
/// Written blob keys.
blob_keys: HashSet<String>,
}
#[async_trait]
impl<CR, W> PuffinWriter for CachedPuffinWriter<CR, W>
where
CR: AsyncRead + AsyncSeek,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
where
R: AsyncRead + Send,
{
ensure!(
!self.blob_keys.contains(key),
DuplicateBlobSnafu { blob: key }
);
ensure!(
!matches!(options.compression, Some(CompressionCodec::Lz4)),
UnsupportedCompressionSnafu { codec: "lz4" }
);
let written_bytes = match options.compression {
Some(CompressionCodec::Lz4) => unreachable!("checked above"),
Some(CompressionCodec::Zstd) => {
let blob = Blob {
blob_type: key.to_string(),
compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
compression_codec: options.compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await?
}
None => {
let blob = Blob {
blob_type: key.to_string(),
compressed_data: raw_data,
compression_codec: options.compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await?
}
};
self.blob_keys.insert(key.to_string());
Ok(written_bytes)
}
async fn put_dir(&mut self, key: &str, dir_path: PathBuf, options: PutOptions) -> Result<u64> {
ensure!(
!self.blob_keys.contains(key),
DuplicateBlobSnafu { blob: key }
);
ensure!(
!matches!(options.compression, Some(CompressionCodec::Lz4)),
UnsupportedCompressionSnafu { codec: "lz4" }
);
// Walk the directory and add all files to the puffin file.
let mut wd = async_walkdir::WalkDir::new(&dir_path).filter(|entry| async move {
match entry.file_type().await {
// Ignore directories.
Ok(ft) if ft.is_dir() => async_walkdir::Filtering::Ignore,
_ => async_walkdir::Filtering::Continue,
}
});
let mut dir_size = 0;
let mut written_bytes = 0;
let mut files = vec![];
while let Some(entry) = wd.next().await {
let entry = entry.context(WalkDirSnafu)?;
dir_size += entry.metadata().await.context(MetadataSnafu)?.len();
let reader = tokio::fs::File::open(entry.path())
.await
.context(OpenSnafu)?
.compat();
let file_key = Uuid::new_v4().to_string();
match options.compression {
Some(CompressionCodec::Lz4) => unreachable!("checked above"),
Some(CompressionCodec::Zstd) => {
let blob = Blob {
blob_type: file_key.clone(),
compressed_data: ZstdEncoder::new(BufReader::new(reader)),
compression_codec: options.compression,
properties: Default::default(),
};
written_bytes += self.puffin_file_writer.add_blob(blob).await?;
}
None => {
let blob = Blob {
blob_type: file_key.clone(),
compressed_data: reader,
compression_codec: options.compression,
properties: Default::default(),
};
written_bytes += self.puffin_file_writer.add_blob(blob).await?;
}
}
let relative_path = entry
.path()
.strip_prefix(&dir_path)
.expect("entry path is under dir path")
.to_string_lossy()
.into_owned();
files.push(DirFileMetadata {
relative_path,
key: file_key.clone(),
blob_index: self.blob_keys.len(),
});
self.blob_keys.insert(file_key);
}
let dir_metadata = DirMetadata { files };
let encoded = serde_json::to_vec(&dir_metadata).context(SerializeJsonSnafu)?;
let dir_meta_blob = Blob {
blob_type: key.to_string(),
compressed_data: encoded.as_slice(),
compression_codec: None,
properties: Default::default(),
};
written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?;
self.blob_keys.insert(key.to_string());
// Move the directory into the cache.
self.cache_manager
.put_dir(&self.puffin_file_name, key, dir_path, dir_size)
.await?;
Ok(written_bytes)
}
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
self.puffin_file_writer
.set_footer_lz4_compressed(lz4_compressed);
}
async fn finish(mut self) -> Result<u64> {
let size = self.puffin_file_writer.finish().await?;
Ok(size)
}
}

View File

@@ -189,18 +189,20 @@ fn test_writer_reader_sync() {
let blob1 = "abcdefghi";
writer
.add_blob(Blob {
data: Cursor::new(&blob1),
compressed_data: Cursor::new(&blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.unwrap();
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: Cursor::new(&blob2),
compressed_data: Cursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.unwrap();
@@ -257,9 +259,10 @@ async fn test_writer_reader_async() {
let blob1 = "abcdefghi".as_bytes();
writer
.add_blob(Blob {
data: AsyncCursor::new(blob1),
compressed_data: AsyncCursor::new(blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.await
.unwrap();
@@ -267,9 +270,10 @@ async fn test_writer_reader_async() {
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: AsyncCursor::new(&blob2),
compressed_data: AsyncCursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
compression_codec: None,
})
.await
.unwrap();