From d9eeeee06e27cbac7c9a1761e8fc7937789d48c0 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 20 Nov 2023 12:29:41 +0800 Subject: [PATCH] feat(puffin): add file reader (#2751) * feat(puffin): add file reader Signed-off-by: Zhenchi * fix: toml format Signed-off-by: Zhenchi * chore: rename PuffinParser to PuffinFileReader Signed-off-by: Zhenchi * chore: polish comments Signed-off-by: Zhenchi * Update src/puffin/src/file_format/reader/footer.rs Co-authored-by: Yingwen * Update src/puffin/src/file_format/reader/file.rs Co-authored-by: Yingwen * Update src/puffin/src/file_format/reader/footer.rs Co-authored-by: Yingwen * Update src/puffin/src/file_format/reader/footer.rs Co-authored-by: Yingwen * fix: check file size Signed-off-by: Zhenchi * fix: redundant type cast Signed-off-by: Zhenchi * fix: reuse read buffer Signed-off-by: Zhenchi * fix: check payload size Signed-off-by: Zhenchi * fix: check payload size Signed-off-by: Zhenchi * fix: validate blob offset Signed-off-by: Zhenchi * fix: validate blob offset Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi Co-authored-by: Yingwen --- Cargo.lock | 6 + Cargo.toml | 1 + src/puffin/Cargo.toml | 6 + src/puffin/src/error.rs | 132 ++++++++ src/puffin/src/file_format.rs | 55 +++ src/puffin/src/file_format/reader.rs | 46 +++ src/puffin/src/file_format/reader/file.rs | 173 ++++++++++ src/puffin/src/file_format/reader/footer.rs | 318 ++++++++++++++++++ src/puffin/src/lib.rs | 5 + src/puffin/src/tests.rs | Bin 0 -> 4142 bytes .../empty-puffin-uncompressed.puffin | Bin 0 -> 32 bytes .../sample-metric-data-uncompressed.puffin | Bin 0 -> 355 bytes 12 files changed, 742 insertions(+) create mode 100644 src/puffin/src/error.rs create mode 100644 src/puffin/src/file_format.rs create mode 100644 src/puffin/src/file_format/reader.rs create mode 100644 src/puffin/src/file_format/reader/file.rs create mode 100644 src/puffin/src/file_format/reader/footer.rs create mode 100644 src/puffin/src/tests.rs create mode 100644 src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin create mode 100644 src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin diff --git a/Cargo.lock b/Cargo.lock index afb5123024..e08c498772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6585,12 +6585,18 @@ dependencies = [ name = "puffin" version = "0.4.3" dependencies = [ + "async-trait", + "bitflags 2.4.1", + "common-error", + "common-macro", "derive_builder 0.12.0", "futures", "pin-project", "serde", "serde_json", + "snafu", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9c9db11176..dbd5d57869 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ async-stream = "0.3" async-trait = "0.1" base64 = "0.21" bigdecimal = "0.4.2" +bitflags = "2.4.1" chrono = { version = "0.4", features = ["serde"] } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index a0c56c0f81..55001b6b07 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -5,11 +5,17 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait.workspace = true +bitflags.workspace = true +common-error.workspace = true +common-macro.workspace = true derive_builder.workspace = true futures.workspace = true pin-project.workspace = true serde.workspace = true serde_json.workspace = true +snafu.workspace = true [dev-dependencies] +tokio-util.workspace = true tokio.workspace = true diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs new file mode 100644 index 0000000000..9de5f9a17c --- /dev/null +++ b/src/puffin/src/error.rs @@ -0,0 +1,132 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Error as IoError; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to seek"))] + Seek { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to read"))] + Read { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Failed to write"))] + Write { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Magic not matched"))] + MagicNotMatched { location: Location }, + + #[snafu(display("Failed to convert bytes to integer"))] + BytesToInteger { + #[snafu(source)] + error: std::array::TryFromSliceError, + location: Location, + }, + + #[snafu(display("Unsupported decompression: {}", decompression))] + UnsupportedDecompression { + decompression: String, + location: Location, + }, + + #[snafu(display("Failed to serialize json"))] + SerializeJson { + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + + #[snafu(display("Failed to deserialize json"))] + DeserializeJson { + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + + #[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))] + ParseStageNotMatch { + expected: String, + actual: String, + location: Location, + }, + + #[snafu(display("Unexpected footer payload size: {}", size))] + UnexpectedFooterPayloadSize { size: i32, location: Location }, + + #[snafu(display( + "Unexpected puffin file size, min: {}, actual: {}", + min_file_size, + actual_file_size + ))] + UnexpectedPuffinFileSize { + min_file_size: u64, + actual_file_size: u64, + location: Location, + }, + + #[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))] + InvalidBlobOffset { offset: i64, location: Location }, + + #[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))] + InvalidBlobAreaEnd { offset: u64, location: Location }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + Seek { .. } + | Read { .. } + | MagicNotMatched { .. } + | DeserializeJson { .. } + | Write { .. } + | SerializeJson { .. } + | BytesToInteger { .. } + | ParseStageNotMatch { .. } + | UnexpectedFooterPayloadSize { .. } + | UnexpectedPuffinFileSize { .. } + | InvalidBlobOffset { .. } + | InvalidBlobAreaEnd { .. } => StatusCode::Unexpected, + + UnsupportedDecompression { .. } => StatusCode::Unsupported, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/puffin/src/file_format.rs b/src/puffin/src/file_format.rs new file mode 100644 index 0000000000..0802c977e8 --- /dev/null +++ b/src/puffin/src/file_format.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Format specification for Puffin files +//! +//! ## File structure +//! +//! Magic Blob₁ Blob₂ ... Blobₙ Footer +//! +//! - `Magic` is four bytes 0x50, 0x46, 0x41, 0x31 (short for: Puffin Fratercula arctica, version 1), +//! - `Blobᵢ` is i-th blob contained in the file, to be interpreted by application according to the footer, +//! - `Footer` is defined below. +//! +//! ## Footer structure +//! +//! Magic FooterPayload FooterPayloadSize Flags Magic +//! +//! - `Magic`: four bytes, same as at the beginning of the file +//! - `FooterPayload`: optionally compressed, UTF-8 encoded JSON payload describing the blobs in the file, with the structure described below +//! - `FooterPayloadSize`: a length in bytes of the `FooterPayload` (after compression, if compressed), stored as 4 byte integer +//! - `Flags`: 4 bytes for boolean flags +//! * byte 0 (first) +//! - bit 0 (lowest bit): whether `FooterPayload` is compressed +//! - all other bits are reserved for future use and should be set to 0 on write +//! * all other bytes are reserved for future use and should be set to 0 on write +//! A 4 byte integer is always signed, in a two’s complement representation, stored little-endian. +//! +//! ## Footer Payload +//! +//! Footer payload bytes is either uncompressed or LZ4-compressed (as a single LZ4 compression frame with content size present), +//! UTF-8 encoded JSON payload representing a single [`FileMetadata`] object. + +pub mod reader; + +use bitflags::bitflags; + +pub const MAGIC: [u8; 4] = [0x50, 0x46, 0x41, 0x31]; + +bitflags! { + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + pub struct Flags: u32 { + const FOOTER_PAYLOAD_COMPRESSED_LZ4 = 0b00000001; + } +} diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs new file mode 100644 index 0000000000..8d51c8cb69 --- /dev/null +++ b/src/puffin/src/file_format/reader.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod file; +mod footer; + +use async_trait::async_trait; + +use crate::blob_metadata::BlobMetadata; +use crate::error::Result; +pub use crate::file_format::reader::file::PuffinFileReader; +use crate::file_metadata::FileMetadata; + +/// `PuffinSyncReader` defines a synchronous reader for puffin data. +pub trait PuffinSyncReader<'a> { + type Reader: std::io::Read + std::io::Seek; + + /// fetch the FileMetadata + fn metadata(&'a mut self) -> Result; + + /// read particular blob data based on given metadata + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; +} + +/// `PuffinAsyncReader` defines an asynchronous reader for puffin data. +#[async_trait] +pub trait PuffinAsyncReader<'a> { + type Reader: futures::AsyncRead + futures::AsyncSeek; + + /// fetch the FileMetadata + async fn metadata(&'a mut self) -> Result; + + /// read particular blob data based on given metadata + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; +} diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs new file mode 100644 index 0000000000..a7ca115b6c --- /dev/null +++ b/src/puffin/src/file_format/reader/file.rs @@ -0,0 +1,173 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{self, SeekFrom}; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use snafu::{ensure, ResultExt}; + +use crate::blob_metadata::BlobMetadata; +use crate::error::{ + MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu, + UnsupportedDecompressionSnafu, +}; +use crate::file_format::reader::footer::{FooterParser, MIN_FOOTER_SIZE}; +use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader}; +use crate::file_format::MAGIC; +use crate::file_metadata::FileMetadata; +use crate::partial_reader::PartialReader; + +/// Puffin file reader, implemented [`PuffinSyncReader`] and [`PuffinAsyncReader`] +/// +/// ```text +/// File layout: Magic Blob₁ Blob₂ ... Blobₙ Footer +/// [4] [?] [?] [?] [?] +/// ``` +pub struct PuffinFileReader { + /// The source of the puffin file + source: R, + + /// The metadata of the puffin file, which is parsed from the footer + metadata: Option, +} + +pub const MAGIC_SIZE: u64 = MAGIC.len() as u64; +pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE; + +impl PuffinFileReader { + pub fn new(source: R) -> Self { + Self { + source, + metadata: None, + } + } + + fn validate_file_size(file_size: u64) -> Result<()> { + ensure!( + file_size >= MIN_FILE_SIZE, + UnexpectedPuffinFileSizeSnafu { + min_file_size: MIN_FILE_SIZE, + actual_file_size: file_size + } + ); + Ok(()) + } +} + +impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinFileReader { + type Reader = PartialReader<&'a mut R>; + + fn metadata(&mut self) -> Result { + if let Some(metadata) = &self.metadata { + return Ok(metadata.clone()); + } + + // check the magic + let mut magic = [0; MAGIC_SIZE as usize]; + self.source.read_exact(&mut magic).context(ReadSnafu)?; + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + + let file_size = self.get_file_size_sync()?; + + // parse the footer + let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?; + self.metadata = Some(metadata.clone()); + Ok(metadata) + } + + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { + // TODO(zhongzc): support decompression + let compression = blob_metadata.compression_codec.as_ref(); + ensure!( + compression.is_none(), + UnsupportedDecompressionSnafu { + decompression: compression.unwrap().to_string() + } + ); + + Ok(PartialReader::new( + &mut self.source, + blob_metadata.offset as _, + blob_metadata.length as _, + )) + } +} + +#[async_trait] +impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a> + for PuffinFileReader +{ + type Reader = PartialReader<&'a mut R>; + + async fn metadata(&'a mut self) -> Result { + if let Some(metadata) = &self.metadata { + return Ok(metadata.clone()); + } + + // check the magic + let mut magic = [0; MAGIC_SIZE as usize]; + self.source + .read_exact(&mut magic) + .await + .context(ReadSnafu)?; + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + + let file_size = self.get_file_size_async().await?; + + // parse the footer + let metadata = FooterParser::new(&mut self.source, file_size) + .parse_async() + .await?; + self.metadata = Some(metadata.clone()); + Ok(metadata) + } + + fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { + // TODO(zhongzc): support decompression + let compression = blob_metadata.compression_codec.as_ref(); + ensure!( + compression.is_none(), + UnsupportedDecompressionSnafu { + decompression: compression.unwrap().to_string() + } + ); + + Ok(PartialReader::new( + &mut self.source, + blob_metadata.offset as _, + blob_metadata.length as _, + )) + } +} + +impl PuffinFileReader { + fn get_file_size_sync(&mut self) -> Result { + let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?; + Self::validate_file_size(file_size)?; + Ok(file_size) + } +} + +impl PuffinFileReader { + async fn get_file_size_async(&mut self) -> Result { + let file_size = self + .source + .seek(SeekFrom::End(0)) + .await + .context(SeekSnafu)?; + Self::validate_file_size(file_size)?; + Ok(file_size) + } +} diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs new file mode 100644 index 0000000000..987c70a7d7 --- /dev/null +++ b/src/puffin/src/file_format/reader/footer.rs @@ -0,0 +1,318 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::{self, SeekFrom}; + +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use snafu::{ensure, ResultExt}; + +use crate::error::{ + BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, + MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, SeekSnafu, + UnexpectedFooterPayloadSizeSnafu, UnsupportedDecompressionSnafu, +}; +use crate::file_format::reader::file::{MAGIC_SIZE, MIN_FILE_SIZE}; +use crate::file_format::{Flags, MAGIC}; +use crate::file_metadata::FileMetadata; + +/// Parser for the footer of a Puffin data file +/// +/// The footer has a specific layout that needs to be read and parsed to +/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type. +/// +/// ```text +/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic +/// [4] [?] [4] [4] [4] +/// ``` +pub struct FooterParser { + // The underlying IO source + source: R, + + // The size of the file, used for calculating offsets to read from + file_size: u64, +} + +pub const FLAGS_SIZE: u64 = 4; +pub const PAYLOAD_SIZE_SIZE: u64 = 4; +pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE * 2 + FLAGS_SIZE + PAYLOAD_SIZE_SIZE; + +impl FooterParser { + pub fn new(source: R, file_size: u64) -> Self { + Self { source, file_size } + } +} + +impl FooterParser { + /// Parses the footer from the IO source in a synchronous manner. + pub fn parse_sync(&mut self) -> Result { + let mut parser = StageParser::new(self.file_size); + + let mut buf = vec![]; + while let Some(byte_to_read) = parser.next_to_read() { + self.source + .seek(SeekFrom::Start(byte_to_read.offset)) + .context(SeekSnafu)?; + let size = byte_to_read.size as usize; + + buf.resize(size, 0); + let buf = &mut buf[..size]; + + self.source.read_exact(buf).context(ReadSnafu)?; + + parser.consume_bytes(buf)?; + } + + parser.finish() + } +} + +impl FooterParser { + /// Parses the footer from the IO source in a asynchronous manner. + pub async fn parse_async(&mut self) -> Result { + let mut parser = StageParser::new(self.file_size); + + let mut buf = vec![]; + while let Some(byte_to_read) = parser.next_to_read() { + self.source + .seek(SeekFrom::Start(byte_to_read.offset)) + .await + .context(SeekSnafu)?; + let size = byte_to_read.size as usize; + + buf.resize(size, 0); + let buf = &mut buf[..size]; + + self.source.read_exact(buf).await.context(ReadSnafu)?; + parser.consume_bytes(buf)?; + } + + parser.finish() + } +} + +/// The internal stages of parsing the footer. +/// This enum allows the StageParser to keep track of which part +/// of the footer needs to be parsed next. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ParseStage { + FootMagic, + Flags, + PayloadSize, + Payload, + HeadMagic, + Done, +} + +/// Manages the parsing process of the file's footer. +struct StageParser { + /// Current stage in the parsing sequence of the footer. + stage: ParseStage, + + /// Total file size; used for calculating offsets to read from. + file_size: u64, + + /// Flags from the footer, set when the `Flags` field is parsed. + flags: Flags, + + /// Size of the footer's payload, set when the `PayloadSize` is parsed. + payload_size: u64, + + /// Metadata from the footer's payload, set when the `Payload` is parsed. + metadata: Option, +} + +/// Represents a read operation that needs to be performed, including the +/// offset from the start of the file and the number of bytes to read. +struct BytesToRead { + offset: u64, + size: u64, +} + +impl StageParser { + fn new(file_size: u64) -> Self { + Self { + stage: ParseStage::FootMagic, + file_size, + payload_size: 0, + flags: Flags::empty(), + metadata: None, + } + } + + /// Determines the next segment of bytes to read based on the current parsing stage. + /// This method returns information like the offset and size of the next read, + /// or None if parsing is complete. + fn next_to_read(&self) -> Option { + if self.stage == ParseStage::Done { + return None; + } + + let btr = match self.stage { + ParseStage::FootMagic => BytesToRead { + offset: self.foot_magic_offset(), + size: MAGIC_SIZE, + }, + ParseStage::Flags => BytesToRead { + offset: self.flags_offset(), + size: FLAGS_SIZE, + }, + ParseStage::PayloadSize => BytesToRead { + offset: self.payload_size_offset(), + size: PAYLOAD_SIZE_SIZE, + }, + ParseStage::Payload => BytesToRead { + offset: self.payload_offset(), + size: self.payload_size, + }, + ParseStage::HeadMagic => BytesToRead { + offset: self.head_magic_offset(), + size: MAGIC_SIZE, + }, + ParseStage::Done => unreachable!(), + }; + + Some(btr) + } + + /// Processes the bytes that have been read according to the current parsing stage + /// and advances the parsing stage. It ensures the correct sequence of bytes is + /// encountered and stores the necessary information in the `StageParser`. + fn consume_bytes(&mut self, bytes: &[u8]) -> Result<()> { + match self.stage { + ParseStage::FootMagic => { + ensure!(bytes == MAGIC, MagicNotMatchedSnafu); + self.stage = ParseStage::Flags; + } + ParseStage::Flags => { + self.flags = Self::parse_flags(bytes)?; + self.stage = ParseStage::PayloadSize; + } + ParseStage::PayloadSize => { + self.payload_size = Self::parse_payload_size(bytes)?; + self.validate_payload_size()?; + self.stage = ParseStage::Payload; + } + ParseStage::Payload => { + self.metadata = Some(self.parse_payload(bytes)?); + self.validate_metadata()?; + self.stage = ParseStage::HeadMagic; + } + ParseStage::HeadMagic => { + ensure!(bytes == MAGIC, MagicNotMatchedSnafu); + self.stage = ParseStage::Done; + } + ParseStage::Done => unreachable!(), + } + + Ok(()) + } + + /// Finalizes the parsing process, ensuring all stages are complete, and returns + /// the parsed `FileMetadata`. It converts the raw footer payload into structured data. + fn finish(self) -> Result { + ensure!( + self.stage == ParseStage::Done, + ParseStageNotMatchSnafu { + expected: format!("{:?}", ParseStage::Done), + actual: format!("{:?}", self.stage), + } + ); + + Ok(self.metadata.unwrap()) + } + + fn parse_flags(bytes: &[u8]) -> Result { + let n = u32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); + Ok(Flags::from_bits_truncate(n)) + } + + fn parse_payload_size(bytes: &[u8]) -> Result { + let n = i32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); + ensure!(n >= 0, UnexpectedFooterPayloadSizeSnafu { size: n }); + Ok(n as u64) + } + + fn validate_payload_size(&self) -> Result<()> { + ensure!( + self.payload_size <= self.file_size - MIN_FILE_SIZE, + UnexpectedFooterPayloadSizeSnafu { + size: self.payload_size as i32 + } + ); + Ok(()) + } + + fn parse_payload(&self, bytes: &[u8]) -> Result { + // TODO(zhongzc): support lz4 + ensure!( + !self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4), + UnsupportedDecompressionSnafu { + decompression: "lz4" + } + ); + + serde_json::from_slice(bytes).context(DeserializeJsonSnafu) + } + + fn validate_metadata(&self) -> Result<()> { + let metadata = self.metadata.as_ref().expect("metadata is not set"); + + let mut next_blob_offset = MAGIC_SIZE; + // check blob offsets + for blob in &metadata.blobs { + ensure!( + blob.offset as u64 == next_blob_offset, + InvalidBlobOffsetSnafu { + offset: blob.offset + } + ); + next_blob_offset += blob.length as u64; + } + + let blob_area_end = metadata + .blobs + .last() + .map_or(MAGIC_SIZE, |b| (b.offset + b.length) as u64); + ensure!( + blob_area_end == self.head_magic_offset(), + InvalidBlobAreaEndSnafu { + offset: blob_area_end + } + ); + + Ok(()) + } + + fn foot_magic_offset(&self) -> u64 { + self.file_size - MAGIC_SIZE + } + + fn flags_offset(&self) -> u64 { + self.file_size - MAGIC_SIZE - FLAGS_SIZE + } + + fn payload_size_offset(&self) -> u64 { + self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE + } + + fn payload_offset(&self) -> u64 { + // `validate_payload_size` ensures that this subtraction will not overflow + self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size + } + + fn head_magic_offset(&self) -> u64 { + // `validate_payload_size` ensures that this subtraction will not overflow + self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size + } +} diff --git a/src/puffin/src/lib.rs b/src/puffin/src/lib.rs index 88a1d02540..40c35057c2 100644 --- a/src/puffin/src/lib.rs +++ b/src/puffin/src/lib.rs @@ -13,5 +13,10 @@ // limitations under the License. pub mod blob_metadata; +pub mod error; +pub mod file_format; pub mod file_metadata; pub mod partial_reader; + +#[cfg(test)] +mod tests; diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..1d48ecd5656e7cf09b22d447154fee31666b4551 GIT binary patch literal 4142 zcmeHJZENF35bo#vim3}h23yL@eP|UzadWwz>EV>v*W(C5tmTosRMzTtS9Qd3zolRL zy??Dgp))I4P8^3gp@loR5E6TL=9y=nnVsDrfHPH8TBdUgM+Zm8@UzCk$_2s%#bPfA z=oj9|7;}R&DDxR=uyce#A!7R1?DXI_)JCctj{F1g*r3yFb-E`EQK=FZqJmsmC=C*Z zG?2&)0dHc82r`FQEs9KvJVsbbJ9lv!6rVBPHyCPa1(^$SC@N6NZ9@>YAr161x3&m_ zV7XlS!V&hBPJ^sY#{_SNXXm5wd7nr(PH*!JjRB1xN~tN~X$7JnQn8p4)l4iwX%MMK z+OvvDFSWEXPkUfgVwXZAGo49ewVamrj%H0RK$+h*Ak{<;oxvD};|}~Z7!Sui26;D} zyuNrlfp>$;%fVbH*#jiSs5W?0XwHCWN>&qR zPP#F&JQX8WdsH4g*04MLj-3`DU%Cng>_5oUTABy z$CWj97KVul!&g+@ll78RVR(sRcCzQzlG2vMGz%Em7=@zGYI@iXGp?ZH6mFOEE=hH+*T>_KjKj@HqAS}hxAcy(~?^M}Y*1@>q{QSd%5@CD7} zx24~Dsw6RJd+=fxV;T#oo%i5}N7-K;ZXu=kBHNJ~`LC!Q9zV*BtXJH=XKR-?k8K{J zEJ(x&ujiuEtDceG{yjN2hvf=rS|$WBc6F==a6_z(;4@~lF7`ghC$~++TP2}gDw>$a zGfdLC;cDmNOj)vBGVt8w&oRHjcIm5;oXzPN$8EuYV9Z2k+*7FZ(y literal 0 HcmV?d00001 diff --git a/src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin b/src/puffin/src/tests/resources/empty-puffin-uncompressed.puffin new file mode 100644 index 0000000000000000000000000000000000000000..142b45bd4ebe0b865064ef874325ff1c94399bb1 GIT binary patch literal 32 fcmWG=b2JP9;%cR&ocyF>C9CMzS{?=n0Eq(tjVlLb literal 0 HcmV?d00001 diff --git a/src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin b/src/puffin/src/tests/resources/sample-metric-data-uncompressed.puffin new file mode 100644 index 0000000000000000000000000000000000000000..ab8da13822c55c573b8ba925eb9ebbc33cd87f28 GIT binary patch literal 355 zcmb7PY+KhqS$DC(-` zheOd{v1(>i#}#>*2^CgSQ@bc|@HE*vl`jHw&~tW?8*fpyrKZ<2g`S#lJ{d}=q`)`~ znHbex;6!0$hw6S4KeZz}O1}v0KMAt?M%;B#;DY2*W@QQsR&14(16i?5T8D!h- Maoaw2Us3IU0?}=DoB#j- literal 0 HcmV?d00001