mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: add prefetch support to PuffinFileFooterReader for reduced I/O time (#5145)
* feat: introduce `PuffinFileFooterReader` * refactor: remove `SyncReader` trait and impl * refactor: replace `FooterParser` with `PuffinFileFooterReader` * chore: remove unused errors
This commit is contained in:
@@ -26,14 +26,6 @@ use crate::inverted_index::search::predicate::Predicate;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to seek"))]
|
||||
Seek {
|
||||
#[snafu(source)]
|
||||
error: IoError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read"))]
|
||||
Read {
|
||||
#[snafu(source)]
|
||||
@@ -215,8 +207,7 @@ impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
match self {
|
||||
Seek { .. }
|
||||
| Read { .. }
|
||||
Read { .. }
|
||||
| Write { .. }
|
||||
| Flush { .. }
|
||||
| Close { .. }
|
||||
|
||||
@@ -25,14 +25,6 @@ use snafu::{Location, Snafu};
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to seek"))]
|
||||
Seek {
|
||||
#[snafu(source)]
|
||||
error: IoError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read"))]
|
||||
Read {
|
||||
#[snafu(source)]
|
||||
@@ -119,14 +111,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert bytes to integer"))]
|
||||
BytesToInteger {
|
||||
#[snafu(source)]
|
||||
error: std::array::TryFromSliceError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported decompression: {}", decompression))]
|
||||
UnsupportedDecompression {
|
||||
decompression: String,
|
||||
@@ -150,17 +134,15 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))]
|
||||
ParseStageNotMatch {
|
||||
expected: String,
|
||||
actual: String,
|
||||
#[snafu(display("Unexpected footer payload size: {}", size))]
|
||||
UnexpectedFooterPayloadSize {
|
||||
size: i32,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected footer payload size: {}", size))]
|
||||
UnexpectedFooterPayloadSize {
|
||||
size: i32,
|
||||
#[snafu(display("Invalid puffin footer"))]
|
||||
InvalidPuffinFooter {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -177,20 +159,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))]
|
||||
InvalidBlobOffset {
|
||||
offset: i64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))]
|
||||
InvalidBlobAreaEnd {
|
||||
offset: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to compress lz4"))]
|
||||
Lz4Compression {
|
||||
#[snafu(source)]
|
||||
@@ -262,8 +230,7 @@ impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
match self {
|
||||
Seek { .. }
|
||||
| Read { .. }
|
||||
Read { .. }
|
||||
| MagicNotMatched { .. }
|
||||
| DeserializeJson { .. }
|
||||
| Write { .. }
|
||||
@@ -275,18 +242,15 @@ impl ErrorExt for Error {
|
||||
| Remove { .. }
|
||||
| Rename { .. }
|
||||
| SerializeJson { .. }
|
||||
| BytesToInteger { .. }
|
||||
| ParseStageNotMatch { .. }
|
||||
| UnexpectedFooterPayloadSize { .. }
|
||||
| UnexpectedPuffinFileSize { .. }
|
||||
| InvalidBlobOffset { .. }
|
||||
| InvalidBlobAreaEnd { .. }
|
||||
| Lz4Compression { .. }
|
||||
| Lz4Decompression { .. }
|
||||
| BlobNotFound { .. }
|
||||
| BlobIndexOutOfBound { .. }
|
||||
| FileKeyNotMatch { .. }
|
||||
| WalkDir { .. } => StatusCode::Unexpected,
|
||||
| WalkDir { .. }
|
||||
| InvalidPuffinFooter { .. } => StatusCode::Unexpected,
|
||||
|
||||
UnsupportedCompression { .. } | UnsupportedDecompression { .. } => {
|
||||
StatusCode::Unsupported
|
||||
|
||||
@@ -21,21 +21,9 @@ use common_base::range_read::RangeReader;
|
||||
use crate::blob_metadata::BlobMetadata;
|
||||
use crate::error::Result;
|
||||
pub use crate::file_format::reader::file::PuffinFileReader;
|
||||
pub use crate::file_format::reader::footer::PuffinFileFooterReader;
|
||||
use crate::file_metadata::FileMetadata;
|
||||
|
||||
/// `SyncReader` defines a synchronous reader for puffin data.
|
||||
pub trait SyncReader<'a> {
|
||||
type Reader: std::io::Read + std::io::Seek;
|
||||
|
||||
/// Fetches the FileMetadata.
|
||||
fn metadata(&'a mut self) -> Result<FileMetadata>;
|
||||
|
||||
/// Reads particular blob data based on given metadata.
|
||||
///
|
||||
/// Data read from the reader is compressed leaving the caller to decompress the data.
|
||||
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
|
||||
}
|
||||
|
||||
/// `AsyncReader` defines an asynchronous reader for puffin data.
|
||||
#[async_trait]
|
||||
pub trait AsyncReader<'a> {
|
||||
|
||||
@@ -12,20 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::{self, SeekFrom};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::range_read::RangeReader;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::blob_metadata::BlobMetadata;
|
||||
use crate::error::{
|
||||
MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu,
|
||||
UnsupportedDecompressionSnafu,
|
||||
};
|
||||
use crate::file_format::reader::footer::FooterParser;
|
||||
use crate::file_format::reader::{AsyncReader, SyncReader};
|
||||
use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE};
|
||||
use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu};
|
||||
use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE;
|
||||
use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader};
|
||||
use crate::file_format::MIN_FILE_SIZE;
|
||||
use crate::file_metadata::FileMetadata;
|
||||
use crate::partial_reader::PartialReader;
|
||||
|
||||
@@ -72,45 +67,6 @@ impl<R> PuffinFileReader<R> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
|
||||
type Reader = PartialReader<&'a mut R>;
|
||||
|
||||
fn metadata(&mut self) -> Result<FileMetadata> {
|
||||
if let Some(metadata) = &self.metadata {
|
||||
return Ok(metadata.clone());
|
||||
}
|
||||
|
||||
// check the magic
|
||||
let mut magic = [0; MAGIC_SIZE as usize];
|
||||
self.source.read_exact(&mut magic).context(ReadSnafu)?;
|
||||
ensure!(magic == MAGIC, MagicNotMatchedSnafu);
|
||||
|
||||
let file_size = self.get_file_size_sync()?;
|
||||
|
||||
// parse the footer
|
||||
let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?;
|
||||
self.metadata = Some(metadata.clone());
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
|
||||
// TODO(zhongzc): support decompression
|
||||
let compression = blob_metadata.compression_codec.as_ref();
|
||||
ensure!(
|
||||
compression.is_none(),
|
||||
UnsupportedDecompressionSnafu {
|
||||
decompression: compression.unwrap().to_string()
|
||||
}
|
||||
);
|
||||
|
||||
Ok(PartialReader::new(
|
||||
&mut self.source,
|
||||
blob_metadata.offset as _,
|
||||
blob_metadata.length as _,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
|
||||
type Reader = PartialReader<&'a mut R>;
|
||||
@@ -119,17 +75,10 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
|
||||
if let Some(metadata) = &self.metadata {
|
||||
return Ok(metadata.clone());
|
||||
}
|
||||
|
||||
// check the magic
|
||||
let magic = self.source.read(0..MAGIC_SIZE).await.context(ReadSnafu)?;
|
||||
ensure!(*magic == MAGIC, MagicNotMatchedSnafu);
|
||||
|
||||
let file_size = self.get_file_size_async().await?;
|
||||
|
||||
// parse the footer
|
||||
let metadata = FooterParser::new(&mut self.source, file_size)
|
||||
.parse_async()
|
||||
.await?;
|
||||
let mut reader = PuffinFileFooterReader::new(&mut self.source, file_size)
|
||||
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
|
||||
let metadata = reader.metadata().await?;
|
||||
self.metadata = Some(metadata.clone());
|
||||
Ok(metadata)
|
||||
}
|
||||
@@ -143,14 +92,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: io::Read + io::Seek> PuffinFileReader<R> {
|
||||
fn get_file_size_sync(&mut self) -> Result<u64> {
|
||||
let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?;
|
||||
Self::validate_file_size(file_size)?;
|
||||
Ok(file_size)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: RangeReader> PuffinFileReader<R> {
|
||||
async fn get_file_size_async(&mut self) -> Result<u64> {
|
||||
let file_size = self
|
||||
|
||||
@@ -12,240 +12,98 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::{self, Cursor, SeekFrom};
|
||||
use std::io::Cursor;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu,
|
||||
Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result,
|
||||
SeekSnafu, UnexpectedFooterPayloadSizeSnafu,
|
||||
DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu,
|
||||
ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
|
||||
};
|
||||
use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
|
||||
use crate::file_metadata::FileMetadata;
|
||||
|
||||
/// Parser for the footer of a Puffin data file
|
||||
/// The default prefetch size for the footer reader.
|
||||
pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB
|
||||
|
||||
/// Reader for the footer of a Puffin data file
|
||||
///
|
||||
/// The footer has a specific layout that needs to be read and parsed to
|
||||
/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type.
|
||||
///
|
||||
/// This reader supports prefetching, allowing for more efficient reading
|
||||
/// of the footer by fetching additional data ahead of time.
|
||||
///
|
||||
/// ```text
|
||||
/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic
|
||||
/// [4] [?] [4] [4] [4]
|
||||
/// ```
|
||||
pub struct FooterParser<R> {
|
||||
// The underlying IO source
|
||||
pub struct PuffinFileFooterReader<R> {
|
||||
/// The source of the puffin file
|
||||
source: R,
|
||||
|
||||
// The size of the file, used for calculating offsets to read from
|
||||
/// The content length of the puffin file
|
||||
file_size: u64,
|
||||
/// The prefetch footer size
|
||||
prefetch_size: Option<u64>,
|
||||
}
|
||||
|
||||
impl<R> FooterParser<R> {
|
||||
pub fn new(source: R, file_size: u64) -> Self {
|
||||
Self { source, file_size }
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: io::Read + io::Seek> FooterParser<R> {
|
||||
/// Parses the footer from the IO source in a synchronous manner.
|
||||
pub fn parse_sync(&mut self) -> Result<FileMetadata> {
|
||||
let mut parser = StageParser::new(self.file_size);
|
||||
|
||||
let mut buf = vec![];
|
||||
while let Some(byte_to_read) = parser.next_to_read() {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(byte_to_read.offset))
|
||||
.context(SeekSnafu)?;
|
||||
let size = byte_to_read.size as usize;
|
||||
|
||||
buf.resize(size, 0);
|
||||
let buf = &mut buf[..size];
|
||||
|
||||
self.source.read_exact(buf).context(ReadSnafu)?;
|
||||
|
||||
parser.consume_bytes(buf)?;
|
||||
impl<'a, R: RangeReader + 'a> PuffinFileFooterReader<R> {
|
||||
pub fn new(source: R, content_len: u64) -> Self {
|
||||
Self {
|
||||
source,
|
||||
file_size: content_len,
|
||||
prefetch_size: None,
|
||||
}
|
||||
|
||||
parser.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: RangeReader> FooterParser<R> {
|
||||
/// Parses the footer from the IO source in a asynchronous manner.
|
||||
pub async fn parse_async(&mut self) -> Result<FileMetadata> {
|
||||
let mut parser = StageParser::new(self.file_size);
|
||||
fn prefetch_size(&self) -> u64 {
|
||||
self.prefetch_size.unwrap_or(MIN_FILE_SIZE)
|
||||
}
|
||||
|
||||
let mut buf = vec![];
|
||||
while let Some(byte_to_read) = parser.next_to_read() {
|
||||
buf.clear();
|
||||
let range = byte_to_read.offset..byte_to_read.offset + byte_to_read.size;
|
||||
self.source
|
||||
.read_into(range, &mut buf)
|
||||
pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
|
||||
self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE));
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn metadata(&'a mut self) -> Result<FileMetadata> {
|
||||
// Note: prefetch > content_len is allowed, since we're using saturating_sub.
|
||||
let footer_start = self.file_size.saturating_sub(self.prefetch_size());
|
||||
let suffix = self
|
||||
.source
|
||||
.read(footer_start..self.file_size)
|
||||
.await
|
||||
.context(ReadSnafu)?;
|
||||
let suffix_len = suffix.len();
|
||||
|
||||
// check the magic
|
||||
let magic = Self::read_tailing_four_bytes(&suffix)?;
|
||||
ensure!(magic == MAGIC, MagicNotMatchedSnafu);
|
||||
|
||||
let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize])?;
|
||||
let length = self.decode_payload_size(
|
||||
&suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize],
|
||||
)?;
|
||||
let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE;
|
||||
|
||||
// Did not fetch the entire file metadata in the initial read, need to make a second request.
|
||||
if length > suffix_len as u64 - footer_size {
|
||||
let metadata_start = self.file_size - length - footer_size;
|
||||
let meta = self
|
||||
.source
|
||||
.read(metadata_start..self.file_size - footer_size)
|
||||
.await
|
||||
.context(ReadSnafu)?;
|
||||
parser.consume_bytes(&buf)?;
|
||||
}
|
||||
|
||||
parser.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// The internal stages of parsing the footer.
|
||||
/// This enum allows the StageParser to keep track of which part
|
||||
/// of the footer needs to be parsed next.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum ParseStage {
|
||||
FootMagic,
|
||||
Flags,
|
||||
PayloadSize,
|
||||
Payload,
|
||||
HeadMagic,
|
||||
Done,
|
||||
}
|
||||
|
||||
/// Manages the parsing process of the file's footer.
|
||||
struct StageParser {
|
||||
/// Current stage in the parsing sequence of the footer.
|
||||
stage: ParseStage,
|
||||
|
||||
/// Total file size; used for calculating offsets to read from.
|
||||
file_size: u64,
|
||||
|
||||
/// Flags from the footer, set when the `Flags` field is parsed.
|
||||
flags: Flags,
|
||||
|
||||
/// Size of the footer's payload, set when the `PayloadSize` is parsed.
|
||||
payload_size: u64,
|
||||
|
||||
/// Metadata from the footer's payload, set when the `Payload` is parsed.
|
||||
metadata: Option<FileMetadata>,
|
||||
}
|
||||
|
||||
/// Represents a read operation that needs to be performed, including the
|
||||
/// offset from the start of the file and the number of bytes to read.
|
||||
struct BytesToRead {
|
||||
offset: u64,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
impl StageParser {
|
||||
fn new(file_size: u64) -> Self {
|
||||
Self {
|
||||
stage: ParseStage::FootMagic,
|
||||
file_size,
|
||||
payload_size: 0,
|
||||
flags: Flags::empty(),
|
||||
metadata: None,
|
||||
self.parse_payload(&flags, &meta)
|
||||
} else {
|
||||
let metadata_start = self.file_size - length - footer_size - footer_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
|
||||
self.parse_payload(&flags, meta)
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the next segment of bytes to read based on the current parsing stage.
|
||||
/// This method returns information like the offset and size of the next read,
|
||||
/// or None if parsing is complete.
|
||||
fn next_to_read(&self) -> Option<BytesToRead> {
|
||||
if self.stage == ParseStage::Done {
|
||||
return None;
|
||||
}
|
||||
|
||||
let btr = match self.stage {
|
||||
ParseStage::FootMagic => BytesToRead {
|
||||
offset: self.foot_magic_offset(),
|
||||
size: MAGIC_SIZE,
|
||||
},
|
||||
ParseStage::Flags => BytesToRead {
|
||||
offset: self.flags_offset(),
|
||||
size: FLAGS_SIZE,
|
||||
},
|
||||
ParseStage::PayloadSize => BytesToRead {
|
||||
offset: self.payload_size_offset(),
|
||||
size: PAYLOAD_SIZE_SIZE,
|
||||
},
|
||||
ParseStage::Payload => BytesToRead {
|
||||
offset: self.payload_offset(),
|
||||
size: self.payload_size,
|
||||
},
|
||||
ParseStage::HeadMagic => BytesToRead {
|
||||
offset: self.head_magic_offset(),
|
||||
size: MAGIC_SIZE,
|
||||
},
|
||||
ParseStage::Done => unreachable!(),
|
||||
};
|
||||
|
||||
Some(btr)
|
||||
}
|
||||
|
||||
/// Processes the bytes that have been read according to the current parsing stage
|
||||
/// and advances the parsing stage. It ensures the correct sequence of bytes is
|
||||
/// encountered and stores the necessary information in the `StageParser`.
|
||||
fn consume_bytes(&mut self, bytes: &[u8]) -> Result<()> {
|
||||
match self.stage {
|
||||
ParseStage::FootMagic => {
|
||||
ensure!(bytes == MAGIC, MagicNotMatchedSnafu);
|
||||
self.stage = ParseStage::Flags;
|
||||
}
|
||||
ParseStage::Flags => {
|
||||
self.flags = Self::parse_flags(bytes)?;
|
||||
self.stage = ParseStage::PayloadSize;
|
||||
}
|
||||
ParseStage::PayloadSize => {
|
||||
self.payload_size = Self::parse_payload_size(bytes)?;
|
||||
self.validate_payload_size()?;
|
||||
self.stage = ParseStage::Payload;
|
||||
}
|
||||
ParseStage::Payload => {
|
||||
self.metadata = Some(self.parse_payload(bytes)?);
|
||||
self.validate_metadata()?;
|
||||
self.stage = ParseStage::HeadMagic;
|
||||
}
|
||||
ParseStage::HeadMagic => {
|
||||
ensure!(bytes == MAGIC, MagicNotMatchedSnafu);
|
||||
self.stage = ParseStage::Done;
|
||||
}
|
||||
ParseStage::Done => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalizes the parsing process, ensuring all stages are complete, and returns
|
||||
/// the parsed `FileMetadata`. It converts the raw footer payload into structured data.
|
||||
fn finish(self) -> Result<FileMetadata> {
|
||||
ensure!(
|
||||
self.stage == ParseStage::Done,
|
||||
ParseStageNotMatchSnafu {
|
||||
expected: format!("{:?}", ParseStage::Done),
|
||||
actual: format!("{:?}", self.stage),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(self.metadata.unwrap())
|
||||
}
|
||||
|
||||
fn parse_flags(bytes: &[u8]) -> Result<Flags> {
|
||||
let n = u32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?);
|
||||
Ok(Flags::from_bits_truncate(n))
|
||||
}
|
||||
|
||||
fn parse_payload_size(bytes: &[u8]) -> Result<u64> {
|
||||
let n = i32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?);
|
||||
ensure!(n >= 0, UnexpectedFooterPayloadSizeSnafu { size: n });
|
||||
Ok(n as u64)
|
||||
}
|
||||
|
||||
fn validate_payload_size(&self) -> Result<()> {
|
||||
ensure!(
|
||||
self.payload_size <= self.file_size - MIN_FILE_SIZE,
|
||||
UnexpectedFooterPayloadSizeSnafu {
|
||||
size: self.payload_size as i32
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_payload(&self, bytes: &[u8]) -> Result<FileMetadata> {
|
||||
if self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
|
||||
fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result<FileMetadata> {
|
||||
if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
|
||||
let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes));
|
||||
let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?;
|
||||
Ok(res)
|
||||
@@ -254,54 +112,35 @@ impl StageParser {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_metadata(&self) -> Result<()> {
|
||||
let metadata = self.metadata.as_ref().expect("metadata is not set");
|
||||
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
|
||||
let suffix_len = suffix.len();
|
||||
ensure!(suffix_len >= 4, InvalidPuffinFooterSnafu);
|
||||
let mut bytes = [0; 4];
|
||||
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
|
||||
|
||||
let mut next_blob_offset = MAGIC_SIZE;
|
||||
// check blob offsets
|
||||
for blob in &metadata.blobs {
|
||||
ensure!(
|
||||
blob.offset as u64 == next_blob_offset,
|
||||
InvalidBlobOffsetSnafu {
|
||||
offset: blob.offset
|
||||
}
|
||||
);
|
||||
next_blob_offset += blob.length as u64;
|
||||
}
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
fn decode_flags(&self, suffix: &[u8]) -> Result<Flags> {
|
||||
let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
|
||||
Ok(Flags::from_bits_truncate(flags))
|
||||
}
|
||||
|
||||
fn decode_payload_size(&self, suffix: &[u8]) -> Result<u64> {
|
||||
let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
|
||||
|
||||
let blob_area_end = metadata
|
||||
.blobs
|
||||
.last()
|
||||
.map_or(MAGIC_SIZE, |b| (b.offset + b.length) as u64);
|
||||
ensure!(
|
||||
blob_area_end == self.head_magic_offset(),
|
||||
InvalidBlobAreaEndSnafu {
|
||||
offset: blob_area_end
|
||||
payload_size >= 0,
|
||||
UnexpectedFooterPayloadSizeSnafu { size: payload_size }
|
||||
);
|
||||
let payload_size = payload_size as u64;
|
||||
ensure!(
|
||||
payload_size <= self.file_size - MIN_FILE_SIZE,
|
||||
UnexpectedFooterPayloadSizeSnafu {
|
||||
size: self.file_size as i32
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn foot_magic_offset(&self) -> u64 {
|
||||
self.file_size - MAGIC_SIZE
|
||||
}
|
||||
|
||||
fn flags_offset(&self) -> u64 {
|
||||
self.file_size - MAGIC_SIZE - FLAGS_SIZE
|
||||
}
|
||||
|
||||
fn payload_size_offset(&self) -> u64 {
|
||||
self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE
|
||||
}
|
||||
|
||||
fn payload_offset(&self) -> u64 {
|
||||
// `validate_payload_size` ensures that this subtraction will not overflow
|
||||
self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size
|
||||
}
|
||||
|
||||
fn head_magic_offset(&self) -> u64 {
|
||||
// `validate_payload_size` ensures that this subtraction will not overflow
|
||||
self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size
|
||||
Ok(payload_size)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,26 +13,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{Cursor, Read};
|
||||
use std::vec;
|
||||
|
||||
use common_base::range_read::{FileReader, RangeReader};
|
||||
use futures::io::Cursor as AsyncCursor;
|
||||
|
||||
use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader};
|
||||
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter};
|
||||
|
||||
#[test]
|
||||
fn test_read_empty_puffin_sync() {
|
||||
let path = "src/tests/resources/empty-puffin-uncompressed.puffin";
|
||||
|
||||
let file = File::open(path).unwrap();
|
||||
let mut reader = PuffinFileReader::new(file);
|
||||
let metadata = reader.metadata().unwrap();
|
||||
assert_eq!(metadata.properties.len(), 0);
|
||||
assert_eq!(metadata.blobs.len(), 0);
|
||||
}
|
||||
use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader, PuffinFileReader};
|
||||
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
|
||||
use crate::file_metadata::FileMetadata;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_empty_puffin_async() {
|
||||
@@ -45,39 +33,37 @@ async fn test_read_empty_puffin_async() {
|
||||
assert_eq!(metadata.blobs.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sample_metric_data_puffin_sync() {
|
||||
let path = "src/tests/resources/sample-metric-data-uncompressed.puffin";
|
||||
async fn test_read_puffin_file_metadata(
|
||||
path: &str,
|
||||
file_size: u64,
|
||||
expeccted_metadata: FileMetadata,
|
||||
) {
|
||||
for prefetch_size in [0, file_size / 2, file_size, file_size + 10] {
|
||||
let reader = FileReader::new(path).await.unwrap();
|
||||
let mut footer_reader = PuffinFileFooterReader::new(reader, file_size);
|
||||
if prefetch_size > 0 {
|
||||
footer_reader = footer_reader.with_prefetch_size(prefetch_size);
|
||||
}
|
||||
let metadata = footer_reader.metadata().await.unwrap();
|
||||
assert_eq!(metadata.properties, expeccted_metadata.properties,);
|
||||
assert_eq!(metadata.blobs, expeccted_metadata.blobs);
|
||||
}
|
||||
}
|
||||
|
||||
let file = File::open(path).unwrap();
|
||||
let mut reader = PuffinFileReader::new(file);
|
||||
let metadata = reader.metadata().unwrap();
|
||||
#[tokio::test]
|
||||
async fn test_read_puffin_file_metadata_async() {
|
||||
let paths = vec![
|
||||
"src/tests/resources/empty-puffin-uncompressed.puffin",
|
||||
"src/tests/resources/sample-metric-data-uncompressed.puffin",
|
||||
];
|
||||
for path in paths {
|
||||
let mut reader = FileReader::new(path).await.unwrap();
|
||||
let file_size = reader.metadata().await.unwrap().content_length;
|
||||
let mut reader = PuffinFileReader::new(reader);
|
||||
let metadata = reader.metadata().await.unwrap();
|
||||
|
||||
assert_eq!(metadata.properties.len(), 1);
|
||||
assert_eq!(
|
||||
metadata.properties.get("created-by"),
|
||||
Some(&"Test 1234".to_string())
|
||||
);
|
||||
|
||||
assert_eq!(metadata.blobs.len(), 2);
|
||||
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
|
||||
assert_eq!(metadata.blobs[0].offset, 4);
|
||||
assert_eq!(metadata.blobs[0].length, 9);
|
||||
|
||||
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
|
||||
assert_eq!(metadata.blobs[1].offset, 13);
|
||||
assert_eq!(metadata.blobs[1].length, 83);
|
||||
|
||||
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
|
||||
let mut buf = String::new();
|
||||
some_blob.read_to_string(&mut buf).unwrap();
|
||||
assert_eq!(buf, "abcdefghi");
|
||||
|
||||
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
some_other_blob.read_to_end(&mut buf).unwrap();
|
||||
let expected = include_bytes!("tests/resources/sample-metric-data.blob");
|
||||
assert_eq!(buf, expected);
|
||||
test_read_puffin_file_metadata(path, file_size, metadata).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -113,38 +99,6 @@ async fn test_sample_metric_data_puffin_async() {
|
||||
assert_eq!(buf, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_writer_reader_with_empty_sync() {
|
||||
fn test_writer_reader_with_empty_sync(footer_compressed: bool) {
|
||||
let mut buf = Cursor::new(vec![]);
|
||||
|
||||
let mut writer = PuffinFileWriter::new(&mut buf);
|
||||
writer.set_properties(HashMap::from([(
|
||||
"created-by".to_string(),
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.set_footer_lz4_compressed(footer_compressed);
|
||||
let written_bytes = writer.finish().unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = Cursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
let metadata = reader.metadata().unwrap();
|
||||
|
||||
assert_eq!(metadata.properties.len(), 1);
|
||||
assert_eq!(
|
||||
metadata.properties.get("created-by"),
|
||||
Some(&"Test 1234".to_string())
|
||||
);
|
||||
|
||||
assert_eq!(metadata.blobs.len(), 0);
|
||||
}
|
||||
|
||||
test_writer_reader_with_empty_sync(false);
|
||||
test_writer_reader_with_empty_sync(true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_writer_reader_empty_async() {
|
||||
async fn test_writer_reader_empty_async(footer_compressed: bool) {
|
||||
@@ -176,76 +130,6 @@ async fn test_writer_reader_empty_async() {
|
||||
test_writer_reader_empty_async(true).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_writer_reader_sync() {
|
||||
fn test_writer_reader_sync(footer_compressed: bool) {
|
||||
let mut buf = Cursor::new(vec![]);
|
||||
|
||||
let mut writer = PuffinFileWriter::new(&mut buf);
|
||||
|
||||
let blob1 = "abcdefghi";
|
||||
writer
|
||||
.add_blob(Blob {
|
||||
compressed_data: Cursor::new(&blob1),
|
||||
blob_type: "some-blob".to_string(),
|
||||
properties: Default::default(),
|
||||
compression_codec: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let blob2 = include_bytes!("tests/resources/sample-metric-data.blob");
|
||||
writer
|
||||
.add_blob(Blob {
|
||||
compressed_data: Cursor::new(&blob2),
|
||||
blob_type: "some-other-blob".to_string(),
|
||||
properties: Default::default(),
|
||||
compression_codec: None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
writer.set_properties(HashMap::from([(
|
||||
"created-by".to_string(),
|
||||
"Test 1234".to_string(),
|
||||
)]));
|
||||
|
||||
writer.set_footer_lz4_compressed(footer_compressed);
|
||||
let written_bytes = writer.finish().unwrap();
|
||||
assert!(written_bytes > 0);
|
||||
|
||||
let mut buf = Cursor::new(buf.into_inner());
|
||||
let mut reader = PuffinFileReader::new(&mut buf);
|
||||
let metadata = reader.metadata().unwrap();
|
||||
|
||||
assert_eq!(metadata.properties.len(), 1);
|
||||
assert_eq!(
|
||||
metadata.properties.get("created-by"),
|
||||
Some(&"Test 1234".to_string())
|
||||
);
|
||||
|
||||
assert_eq!(metadata.blobs.len(), 2);
|
||||
assert_eq!(metadata.blobs[0].blob_type, "some-blob");
|
||||
assert_eq!(metadata.blobs[0].offset, 4);
|
||||
assert_eq!(metadata.blobs[0].length, 9);
|
||||
|
||||
assert_eq!(metadata.blobs[1].blob_type, "some-other-blob");
|
||||
assert_eq!(metadata.blobs[1].offset, 13);
|
||||
assert_eq!(metadata.blobs[1].length, 83);
|
||||
|
||||
let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap();
|
||||
let mut buf = String::new();
|
||||
some_blob.read_to_string(&mut buf).unwrap();
|
||||
assert_eq!(buf, blob1);
|
||||
|
||||
let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
some_other_blob.read_to_end(&mut buf).unwrap();
|
||||
assert_eq!(buf, blob2);
|
||||
}
|
||||
|
||||
test_writer_reader_sync(false);
|
||||
test_writer_reader_sync(true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_writer_reader_async() {
|
||||
async fn test_writer_reader_async(footer_compressed: bool) {
|
||||
|
||||
Reference in New Issue
Block a user