feat(puffin): support lz4 compression for footer (#4194)

* feat(puffin): support lz4 compression for footer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-06-24 19:39:03 +08:00
committed by GitHub
parent ea7c17089f
commit 07cbabab7b
8 changed files with 237 additions and 153 deletions

1
Cargo.lock generated
View File

@@ -8382,6 +8382,7 @@ dependencies = [
"common-macro",
"derive_builder 0.12.0",
"futures",
"lz4_flex 0.11.3",
"pin-project",
"serde",
"serde_json",

View File

@@ -14,6 +14,7 @@ common-error.workspace = true
common-macro.workspace = true
derive_builder.workspace = true
futures.workspace = true
lz4_flex = "0.11"
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -141,6 +141,24 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to compress lz4"))]
Lz4Compression {
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decompress lz4"))]
Lz4Decompression {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -160,7 +178,9 @@ impl ErrorExt for Error {
| UnexpectedFooterPayloadSize { .. }
| UnexpectedPuffinFileSize { .. }
| InvalidBlobOffset { .. }
| InvalidBlobAreaEnd { .. } => StatusCode::Unexpected,
| InvalidBlobAreaEnd { .. }
| Lz4Compression { .. }
| Lz4Decompression { .. } => StatusCode::Unexpected,
UnsupportedDecompression { .. } => StatusCode::Unsupported,
}

View File

@@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::{self, SeekFrom};
use std::io::{self, Cursor, SeekFrom};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use snafu::{ensure, ResultExt};
use crate::error::{
BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu,
MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, SeekSnafu,
UnexpectedFooterPayloadSizeSnafu, UnsupportedDecompressionSnafu,
Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result,
SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
};
use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
use crate::file_metadata::FileMetadata;
@@ -249,15 +249,13 @@ impl StageParser {
}
fn parse_payload(&self, bytes: &[u8]) -> Result<FileMetadata> {
// TODO(zhongzc): support lz4
ensure!(
!self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4),
UnsupportedDecompressionSnafu {
decompression: "lz4"
}
);
serde_json::from_slice(bytes).context(DeserializeJsonSnafu)
if self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes));
let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?;
Ok(res)
} else {
serde_json::from_slice(bytes).context(DeserializeJsonSnafu)
}
}
fn validate_metadata(&self) -> Result<()> {

View File

@@ -41,6 +41,9 @@ pub trait PuffinSyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);
/// Sets whether the footer payload should be LZ4 compressed.
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool);
/// Add a blob to the Puffin file
fn add_blob<R: std::io::Read>(&mut self, blob: Blob<R>) -> Result<()>;
@@ -54,6 +57,9 @@ pub trait PuffinAsyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);
/// Sets whether the footer payload should be LZ4 compressed.
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool);
/// Add a blob to the Puffin file
async fn add_blob<R: futures::AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<()>;

View File

@@ -27,17 +27,20 @@ use crate::file_format::MAGIC;
/// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`]
pub struct PuffinFileWriter<W> {
/// The writer to write to
/// The writer to write to.
writer: W,
/// The properties of the file
/// The properties of the file.
properties: HashMap<String, String>,
/// The metadata of the blobs
/// The metadata of the blobs.
blob_metadata: Vec<BlobMetadata>,
/// The number of bytes written
/// The number of bytes written.
written_bytes: u64,
/// Whether the footer payload should be LZ4 compressed.
footer_lz4_compressed: bool,
}
impl<W> PuffinFileWriter<W> {
@@ -47,6 +50,7 @@ impl<W> PuffinFileWriter<W> {
properties: HashMap::new(),
blob_metadata: Vec::new(),
written_bytes: 0,
footer_lz4_compressed: false,
}
}
@@ -83,6 +87,10 @@ impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
Ok(())
}
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
self.footer_lz4_compressed = lz4_compressed;
}
fn finish(&mut self) -> Result<usize> {
self.write_header_if_needed_sync()?;
self.write_footer_sync()?;
@@ -112,6 +120,10 @@ impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
Ok(())
}
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
self.footer_lz4_compressed = lz4_compressed;
}
async fn finish(&mut self) -> Result<usize> {
self.write_header_if_needed_async().await?;
self.write_footer_async().await?;
@@ -135,6 +147,7 @@ impl<W: io::Write> PuffinFileWriter<W> {
let bytes = FooterWriter::new(
mem::take(&mut self.blob_metadata),
mem::take(&mut self.properties),
self.footer_lz4_compressed,
)
.into_footer_bytes()?;
@@ -157,6 +170,7 @@ impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
let bytes = FooterWriter::new(
mem::take(&mut self.blob_metadata),
mem::take(&mut self.properties),
self.footer_lz4_compressed,
)
.into_footer_bytes()?;

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use std::collections::HashMap;
use std::io::Write;
use std::mem;
use snafu::ResultExt;
use crate::blob_metadata::BlobMetadata;
use crate::error::{Result, SerializeJsonSnafu};
use crate::error::{Lz4CompressionSnafu, Result, SerializeJsonSnafu};
use crate::file_format::{Flags, MAGIC, MIN_FOOTER_SIZE};
use crate::file_metadata::FileMetadataBuilder;
@@ -31,13 +32,19 @@ use crate::file_metadata::FileMetadataBuilder;
pub struct FooterWriter {
blob_metadata: Vec<BlobMetadata>,
file_properties: HashMap<String, String>,
lz4_compressed: bool,
}
impl FooterWriter {
pub fn new(blob_metadata: Vec<BlobMetadata>, file_properties: HashMap<String, String>) -> Self {
pub fn new(
blob_metadata: Vec<BlobMetadata>,
file_properties: HashMap<String, String>,
lz4_compressed: bool,
) -> Self {
Self {
blob_metadata,
file_properties,
lz4_compressed,
}
}
@@ -70,10 +77,15 @@ impl FooterWriter {
}
/// Appends reserved flags (currently zero-initialized) to the given buffer.
///
/// TODO(zhongzc): support compression
fn write_flags(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&Flags::DEFAULT.bits().to_le_bytes());
let mut flags = Flags::DEFAULT;
if self.lz4_compressed {
flags |= Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4;
} else {
flags &= !Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4;
}
buf.extend_from_slice(&flags.bits().to_le_bytes());
}
fn footer_payload(&mut self) -> Result<Vec<u8>> {
@@ -83,6 +95,14 @@ impl FooterWriter {
.build()
.expect("Required fields are not set");
serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu)
if self.lz4_compressed {
let mut buf = vec![];
let mut encoder = lz4_flex::frame::FrameEncoder::new(&mut buf);
serde_json::to_writer(&mut encoder, &file_metadata).context(SerializeJsonSnafu)?;
encoder.flush().context(Lz4CompressionSnafu)?;
Ok(buf)
} else {
serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu)
}
}
}

View File

@@ -117,178 +117,202 @@ async fn test_sample_metric_data_puffin_async() {
#[test]
fn test_writer_reader_with_empty_sync() {
let mut buf = Cursor::new(vec![]);
fn test_writer_reader_with_empty_sync(footer_compressed: bool) {
let mut buf = Cursor::new(vec![]);
let mut writer = PuffinFileWriter::new(&mut buf);
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let mut writer = PuffinFileWriter::new(&mut buf);
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);
writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);
let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();
let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.blobs.len(), 0);
assert_eq!(metadata.blobs.len(), 0);
}
test_writer_reader_with_empty_sync(false);
test_writer_reader_with_empty_sync(true);
}
#[tokio::test]
async fn test_writer_reader_empty_async() {
let mut buf = AsyncCursor::new(vec![]);
async fn test_writer_reader_empty_async(footer_compressed: bool) {
let mut buf = AsyncCursor::new(vec![]);
let mut writer = PuffinFileWriter::new(&mut buf);
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let mut writer = PuffinFileWriter::new(&mut buf);
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let written_bytes = writer.finish().await.unwrap();
assert!(written_bytes > 0);
writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().await.unwrap();
assert!(written_bytes > 0);
let mut buf = AsyncCursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().await.unwrap();
let mut buf = AsyncCursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.blobs.len(), 0);
assert_eq!(metadata.blobs.len(), 0);
}
test_writer_reader_empty_async(false).await;
test_writer_reader_empty_async(true).await;
}
#[test]
fn test_writer_reader_sync() {
let mut buf = Cursor::new(vec![]);
fn test_writer_reader_sync(footer_compressed: bool) {
let mut buf = Cursor::new(vec![]);
let mut writer = PuffinFileWriter::new(&mut buf);
let mut writer = PuffinFileWriter::new(&mut buf);
let blob1 = "abcdefghi";
writer
.add_blob(Blob {
data: Cursor::new(&blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
})
.unwrap();
let blob1 = "abcdefghi";
writer
.add_blob(Blob {
data: Cursor::new(&blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
})
.unwrap();
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: Cursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
})
.unwrap();
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: Cursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
})
.unwrap();
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);
writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().unwrap();
assert!(written_bytes > 0);
let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();
let mut buf = Cursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().unwrap();
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);
assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = String::new();
some_blob.read_to_string(&mut buf).unwrap();
assert_eq!(buf, blob1);
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = String::new();
some_blob.read_to_string(&mut buf).unwrap();
assert_eq!(buf, blob1);
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).unwrap();
assert_eq!(buf, blob2);
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).unwrap();
assert_eq!(buf, blob2);
}
test_writer_reader_sync(false);
test_writer_reader_sync(true);
}
#[tokio::test]
async fn test_writer_reader_async() {
let mut buf = AsyncCursor::new(vec![]);
async fn test_writer_reader_async(footer_compressed: bool) {
let mut buf = AsyncCursor::new(vec![]);
let mut writer = PuffinFileWriter::new(&mut buf);
let mut writer = PuffinFileWriter::new(&mut buf);
let blob1 = "abcdefghi".as_bytes();
writer
.add_blob(Blob {
data: AsyncCursor::new(blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
})
.await
.unwrap();
let blob1 = "abcdefghi".as_bytes();
writer
.add_blob(Blob {
data: AsyncCursor::new(blob1),
blob_type: "some-blob".to_string(),
properties: Default::default(),
})
.await
.unwrap();
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: AsyncCursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
})
.await
.unwrap();
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
writer
.add_blob(Blob {
data: AsyncCursor::new(&blob2),
blob_type: "some-other-blob".to_string(),
properties: Default::default(),
})
.await
.unwrap();
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
writer.set_properties(HashMap::from([(
"created-by".to_string(),
"Test 1234".to_string(),
)]));
let written_bytes = writer.finish().await.unwrap();
assert!(written_bytes > 0);
writer.set_footer_lz4_compressed(footer_compressed);
let written_bytes = writer.finish().await.unwrap();
assert!(written_bytes > 0);
let mut buf = AsyncCursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().await.unwrap();
let mut buf = AsyncCursor::new(buf.into_inner());
let mut reader = PuffinFileReader::new(&mut buf);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.properties.len(), 1);
assert_eq!(
metadata.properties.get("created-by"),
Some(&"Test 1234".to_string())
);
assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);
assert_eq!(metadata.blobs.len(), 2);
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
assert_eq!(metadata.blobs[0].offset, 4);
assert_eq!(metadata.blobs[0].length, 9);
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
assert_eq!(metadata.blobs[1].offset, 13);
assert_eq!(metadata.blobs[1].length, 83);
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = Vec::new();
some_blob.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, blob1);
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
let mut buf = Vec::new();
some_blob.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, blob1);
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, blob2);
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
let mut buf = Vec::new();
some_other_blob.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, blob2);
}
test_writer_reader_async(false).await;
test_writer_reader_async(true).await;
}