From 07cbabab7b956afc20e42f8dc1dbc467fc821870 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 24 Jun 2024 19:39:03 +0800 Subject: [PATCH] feat(puffin): support lz4 compression for footer (#4194) * feat(puffin): support lz4 compression for footer Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/puffin/Cargo.toml | 1 + src/puffin/src/error.rs | 22 +- src/puffin/src/file_format/reader/footer.rs | 22 +- src/puffin/src/file_format/writer.rs | 6 + src/puffin/src/file_format/writer/file.rs | 22 +- src/puffin/src/file_format/writer/footer.rs | 32 ++- src/puffin/src/tests.rs | 284 +++++++++++--------- 8 files changed, 237 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 834d88349a..a6777c3c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8382,6 +8382,7 @@ dependencies = [ "common-macro", "derive_builder 0.12.0", "futures", + "lz4_flex 0.11.3", "pin-project", "serde", "serde_json", diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index 7e43c29e39..fea00dc0ba 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -14,6 +14,7 @@ common-error.workspace = true common-macro.workspace = true derive_builder.workspace = true futures.workspace = true +lz4_flex = "0.11" pin-project.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 34b76ef521..cf86132232 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -141,6 +141,24 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to compress lz4"))] + Lz4Compression { + #[snafu(source)] + error: std::io::Error, + + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to decompress lz4"))] + Lz4Decompression { + #[snafu(source)] + error: serde_json::Error, + + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -160,7 +178,9 @@ impl ErrorExt for Error { | UnexpectedFooterPayloadSize { .. } | UnexpectedPuffinFileSize { .. } | InvalidBlobOffset { .. } - | InvalidBlobAreaEnd { .. } => StatusCode::Unexpected, + | InvalidBlobAreaEnd { .. } + | Lz4Compression { .. } + | Lz4Decompression { .. } => StatusCode::Unexpected, UnsupportedDecompression { .. } => StatusCode::Unsupported, } diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 0b7c67ccb3..1d0e915ed3 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, SeekFrom}; +use std::io::{self, Cursor, SeekFrom}; use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use snafu::{ensure, ResultExt}; use crate::error::{ BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, - MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, SeekSnafu, - UnexpectedFooterPayloadSizeSnafu, UnsupportedDecompressionSnafu, + Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, + SeekSnafu, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; @@ -249,15 +249,13 @@ impl StageParser { } fn parse_payload(&self, bytes: &[u8]) -> Result { - // TODO(zhongzc): support lz4 - ensure!( - !self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4), - UnsupportedDecompressionSnafu { - decompression: "lz4" - } - ); - - serde_json::from_slice(bytes).context(DeserializeJsonSnafu) + if self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { + let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes)); + let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?; + Ok(res) + } else { + serde_json::from_slice(bytes).context(DeserializeJsonSnafu) + } } fn validate_metadata(&self) -> Result<()> { diff --git a/src/puffin/src/file_format/writer.rs b/src/puffin/src/file_format/writer.rs index 24d1fb0e9f..7215fa6f6b 100644 --- a/src/puffin/src/file_format/writer.rs +++ b/src/puffin/src/file_format/writer.rs @@ -41,6 +41,9 @@ pub trait PuffinSyncWriter { /// Set the properties of the Puffin file fn set_properties(&mut self, properties: HashMap); + /// Sets whether the footer payload should be LZ4 compressed. + fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool); + /// Add a blob to the Puffin file fn add_blob(&mut self, blob: Blob) -> Result<()>; @@ -54,6 +57,9 @@ pub trait PuffinAsyncWriter { /// Set the properties of the Puffin file fn set_properties(&mut self, properties: HashMap); + /// Sets whether the footer payload should be LZ4 compressed. + fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool); + /// Add a blob to the Puffin file async fn add_blob(&mut self, blob: Blob) -> Result<()>; diff --git a/src/puffin/src/file_format/writer/file.rs b/src/puffin/src/file_format/writer/file.rs index 7337ee2d26..d10c2aedde 100644 --- a/src/puffin/src/file_format/writer/file.rs +++ b/src/puffin/src/file_format/writer/file.rs @@ -27,17 +27,20 @@ use crate::file_format::MAGIC; /// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`] pub struct PuffinFileWriter { - /// The writer to write to + /// The writer to write to. writer: W, - /// The properties of the file + /// The properties of the file. properties: HashMap, - /// The metadata of the blobs + /// The metadata of the blobs. blob_metadata: Vec, - /// The number of bytes written + /// The number of bytes written. written_bytes: u64, + + /// Whether the footer payload should be LZ4 compressed. + footer_lz4_compressed: bool, } impl PuffinFileWriter { @@ -47,6 +50,7 @@ impl PuffinFileWriter { properties: HashMap::new(), blob_metadata: Vec::new(), written_bytes: 0, + footer_lz4_compressed: false, } } @@ -83,6 +87,10 @@ impl PuffinSyncWriter for PuffinFileWriter { Ok(()) } + fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) { + self.footer_lz4_compressed = lz4_compressed; + } + fn finish(&mut self) -> Result { self.write_header_if_needed_sync()?; self.write_footer_sync()?; @@ -112,6 +120,10 @@ impl PuffinAsyncWriter for PuffinFileWriter { Ok(()) } + fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) { + self.footer_lz4_compressed = lz4_compressed; + } + async fn finish(&mut self) -> Result { self.write_header_if_needed_async().await?; self.write_footer_async().await?; @@ -135,6 +147,7 @@ impl PuffinFileWriter { let bytes = FooterWriter::new( mem::take(&mut self.blob_metadata), mem::take(&mut self.properties), + self.footer_lz4_compressed, ) .into_footer_bytes()?; @@ -157,6 +170,7 @@ impl PuffinFileWriter { let bytes = FooterWriter::new( mem::take(&mut self.blob_metadata), mem::take(&mut self.properties), + self.footer_lz4_compressed, ) .into_footer_bytes()?; diff --git a/src/puffin/src/file_format/writer/footer.rs b/src/puffin/src/file_format/writer/footer.rs index b24a50e8ca..23ce351897 100644 --- a/src/puffin/src/file_format/writer/footer.rs +++ b/src/puffin/src/file_format/writer/footer.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::io::Write; use std::mem; use snafu::ResultExt; use crate::blob_metadata::BlobMetadata; -use crate::error::{Result, SerializeJsonSnafu}; +use crate::error::{Lz4CompressionSnafu, Result, SerializeJsonSnafu}; use crate::file_format::{Flags, MAGIC, MIN_FOOTER_SIZE}; use crate::file_metadata::FileMetadataBuilder; @@ -31,13 +32,19 @@ use crate::file_metadata::FileMetadataBuilder; pub struct FooterWriter { blob_metadata: Vec, file_properties: HashMap, + lz4_compressed: bool, } impl FooterWriter { - pub fn new(blob_metadata: Vec, file_properties: HashMap) -> Self { + pub fn new( + blob_metadata: Vec, + file_properties: HashMap, + lz4_compressed: bool, + ) -> Self { Self { blob_metadata, file_properties, + lz4_compressed, } } @@ -70,10 +77,15 @@ impl FooterWriter { } /// Appends reserved flags (currently zero-initialized) to the given buffer. - /// - /// TODO(zhongzc): support compression fn write_flags(&self, buf: &mut Vec) { - buf.extend_from_slice(&Flags::DEFAULT.bits().to_le_bytes()); + let mut flags = Flags::DEFAULT; + if self.lz4_compressed { + flags |= Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4; + } else { + flags &= !Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4; + } + + buf.extend_from_slice(&flags.bits().to_le_bytes()); } fn footer_payload(&mut self) -> Result> { @@ -83,6 +95,14 @@ impl FooterWriter { .build() .expect("Required fields are not set"); - serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu) + if self.lz4_compressed { + let mut buf = vec![]; + let mut encoder = lz4_flex::frame::FrameEncoder::new(&mut buf); + serde_json::to_writer(&mut encoder, &file_metadata).context(SerializeJsonSnafu)?; + encoder.flush().context(Lz4CompressionSnafu)?; + Ok(buf) + } else { + serde_json::to_vec(&file_metadata).context(SerializeJsonSnafu) + } } } diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index 09792085c4..4b10c17816 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -117,178 +117,202 @@ async fn test_sample_metric_data_puffin_async() { #[test] fn test_writer_reader_with_empty_sync() { - let mut buf = Cursor::new(vec![]); + fn test_writer_reader_with_empty_sync(footer_compressed: bool) { + let mut buf = Cursor::new(vec![]); - let mut writer = PuffinFileWriter::new(&mut buf); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); + let mut writer = PuffinFileWriter::new(&mut buf); + writer.set_properties(HashMap::from([( + "created-by".to_string(), + "Test 1234".to_string(), + )])); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); + writer.set_footer_lz4_compressed(footer_compressed); + let written_bytes = writer.finish().unwrap(); + assert!(written_bytes > 0); - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); + let mut buf = Cursor::new(buf.into_inner()); + let mut reader = PuffinFileReader::new(&mut buf); + let metadata = reader.metadata().unwrap(); - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); + assert_eq!(metadata.properties.len(), 1); + assert_eq!( + metadata.properties.get("created-by"), + Some(&"Test 1234".to_string()) + ); - assert_eq!(metadata.blobs.len(), 0); + assert_eq!(metadata.blobs.len(), 0); + } + + test_writer_reader_with_empty_sync(false); + test_writer_reader_with_empty_sync(true); } #[tokio::test] async fn test_writer_reader_empty_async() { - let mut buf = AsyncCursor::new(vec![]); + async fn test_writer_reader_empty_async(footer_compressed: bool) { + let mut buf = AsyncCursor::new(vec![]); - let mut writer = PuffinFileWriter::new(&mut buf); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); + let mut writer = PuffinFileWriter::new(&mut buf); + writer.set_properties(HashMap::from([( + "created-by".to_string(), + "Test 1234".to_string(), + )])); - let written_bytes = writer.finish().await.unwrap(); - assert!(written_bytes > 0); + writer.set_footer_lz4_compressed(footer_compressed); + let written_bytes = writer.finish().await.unwrap(); + assert!(written_bytes > 0); - let mut buf = AsyncCursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().await.unwrap(); + let mut buf = AsyncCursor::new(buf.into_inner()); + let mut reader = PuffinFileReader::new(&mut buf); + let metadata = reader.metadata().await.unwrap(); - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); + assert_eq!(metadata.properties.len(), 1); + assert_eq!( + metadata.properties.get("created-by"), + Some(&"Test 1234".to_string()) + ); - assert_eq!(metadata.blobs.len(), 0); + assert_eq!(metadata.blobs.len(), 0); + } + + test_writer_reader_empty_async(false).await; + test_writer_reader_empty_async(true).await; } #[test] fn test_writer_reader_sync() { - let mut buf = Cursor::new(vec![]); + fn test_writer_reader_sync(footer_compressed: bool) { + let mut buf = Cursor::new(vec![]); - let mut writer = PuffinFileWriter::new(&mut buf); + let mut writer = PuffinFileWriter::new(&mut buf); - let blob1 = "abcdefghi"; - writer - .add_blob(Blob { - data: Cursor::new(&blob1), - blob_type: "some-blob".to_string(), - properties: Default::default(), - }) - .unwrap(); + let blob1 = "abcdefghi"; + writer + .add_blob(Blob { + data: Cursor::new(&blob1), + blob_type: "some-blob".to_string(), + properties: Default::default(), + }) + .unwrap(); - let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); - writer - .add_blob(Blob { - data: Cursor::new(&blob2), - blob_type: "some-other-blob".to_string(), - properties: Default::default(), - }) - .unwrap(); + let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); + writer + .add_blob(Blob { + data: Cursor::new(&blob2), + blob_type: "some-other-blob".to_string(), + properties: Default::default(), + }) + .unwrap(); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); + writer.set_properties(HashMap::from([( + "created-by".to_string(), + "Test 1234".to_string(), + )])); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); + writer.set_footer_lz4_compressed(footer_compressed); + let written_bytes = writer.finish().unwrap(); + assert!(written_bytes > 0); - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); + let mut buf = Cursor::new(buf.into_inner()); + let mut reader = PuffinFileReader::new(&mut buf); + let metadata = reader.metadata().unwrap(); - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); + assert_eq!(metadata.properties.len(), 1); + assert_eq!( + metadata.properties.get("created-by"), + Some(&"Test 1234".to_string()) + ); - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); + assert_eq!(metadata.blobs.len(), 2); + assert_eq!(metadata.blobs[0].blob_type, "some-blob"); + assert_eq!(metadata.blobs[0].offset, 4); + assert_eq!(metadata.blobs[0].length, 9); - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); + assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); + assert_eq!(metadata.blobs[1].offset, 13); + assert_eq!(metadata.blobs[1].length, 83); - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, blob1); + let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); + let mut buf = String::new(); + some_blob.read_to_string(&mut buf).unwrap(); + assert_eq!(buf, blob1); - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - assert_eq!(buf, blob2); + let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); + let mut buf = Vec::new(); + some_other_blob.read_to_end(&mut buf).unwrap(); + assert_eq!(buf, blob2); + } + + test_writer_reader_sync(false); + test_writer_reader_sync(true); } #[tokio::test] async fn test_writer_reader_async() { - let mut buf = AsyncCursor::new(vec![]); + async fn test_writer_reader_async(footer_compressed: bool) { + let mut buf = AsyncCursor::new(vec![]); - let mut writer = PuffinFileWriter::new(&mut buf); + let mut writer = PuffinFileWriter::new(&mut buf); - let blob1 = "abcdefghi".as_bytes(); - writer - .add_blob(Blob { - data: AsyncCursor::new(blob1), - blob_type: "some-blob".to_string(), - properties: Default::default(), - }) - .await - .unwrap(); + let blob1 = "abcdefghi".as_bytes(); + writer + .add_blob(Blob { + data: AsyncCursor::new(blob1), + blob_type: "some-blob".to_string(), + properties: Default::default(), + }) + .await + .unwrap(); - let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); - writer - .add_blob(Blob { - data: AsyncCursor::new(&blob2), - blob_type: "some-other-blob".to_string(), - properties: Default::default(), - }) - .await - .unwrap(); + let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); + writer + .add_blob(Blob { + data: AsyncCursor::new(&blob2), + blob_type: "some-other-blob".to_string(), + properties: Default::default(), + }) + .await + .unwrap(); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); + writer.set_properties(HashMap::from([( + "created-by".to_string(), + "Test 1234".to_string(), + )])); - let written_bytes = writer.finish().await.unwrap(); - assert!(written_bytes > 0); + writer.set_footer_lz4_compressed(footer_compressed); + let written_bytes = writer.finish().await.unwrap(); + assert!(written_bytes > 0); - let mut buf = AsyncCursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().await.unwrap(); + let mut buf = AsyncCursor::new(buf.into_inner()); + let mut reader = PuffinFileReader::new(&mut buf); + let metadata = reader.metadata().await.unwrap(); - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); + assert_eq!(metadata.properties.len(), 1); + assert_eq!( + metadata.properties.get("created-by"), + Some(&"Test 1234".to_string()) + ); - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); + assert_eq!(metadata.blobs.len(), 2); + assert_eq!(metadata.blobs[0].blob_type, "some-blob"); + assert_eq!(metadata.blobs[0].offset, 4); + assert_eq!(metadata.blobs[0].length, 9); - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); + assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); + assert_eq!(metadata.blobs[1].offset, 13); + assert_eq!(metadata.blobs[1].length, 83); - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = Vec::new(); - some_blob.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, blob1); + let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); + let mut buf = Vec::new(); + some_blob.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, blob1); - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, blob2); + let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); + let mut buf = Vec::new(); + some_other_blob.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, blob2); + } + + test_writer_reader_async(false).await; + test_writer_reader_async(true).await; }