From 6485a26fa3adaac22195da8a9a983daa9fa2cf66 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:52:04 +0800 Subject: [PATCH] refactor: load metadata using offical impl (#7302) * refactor: load metadata using offical impl Signed-off-by: discord9 * pcr Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/sst/parquet/metadata.rs | 168 +++++++++----------------- 1 file changed, 60 insertions(+), 108 deletions(-) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index 2cf1ecfda8..05c7aac462 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -12,17 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::result::Result as StdResult; + +use bytes::Bytes; +use futures::FutureExt; +use futures::future::BoxFuture; use object_store::ObjectStore; -use parquet::file::FOOTER_SIZE; +use parquet::arrow::async_reader::MetadataFetch; +use parquet::errors::{ParquetError, Result as ParquetResult}; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; -use snafu::ResultExt; +use snafu::{IntoError as _, ResultExt}; use crate::error::{self, Result}; /// The estimated size of the footer and metadata need to read from the end of parquet file. const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024; -/// Load the metadata of parquet file in an async way. pub(crate) struct MetadataLoader<'a> { // An object store that supports async read object_store: ObjectStore, @@ -46,111 +51,7 @@ impl<'a> MetadataLoader<'a> { } } - /// Async load the metadata of parquet file. - /// - /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the - /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata. - /// - /// Parquet File Format: - /// ```text - /// ┌───────────────────────────────────┐ - /// |4-byte magic number "PAR1" | - /// |───────────────────────────────────| - /// |Column 1 Chunk 1 + Column Metadata | - /// |Column 2 Chunk 1 + Column Metadata | - /// |... | - /// |Column N Chunk M + Column Metadata | - /// |───────────────────────────────────| - /// |File Metadata | - /// |───────────────────────────────────| - /// |4-byte length of file metadata | - /// |4-byte magic number "PAR1" | - /// └───────────────────────────────────┘ - /// ``` - /// - /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106 - pub async fn load(&self) -> Result { - let object_store = &self.object_store; - let path = self.file_path; - let file_size = self.get_file_size().await?; - - if file_size < FOOTER_SIZE as u64 { - return error::InvalidParquetSnafu { - file: path, - reason: "file size is smaller than footer size", - } - .fail(); - } - - // Prefetch bytes for metadata from the end and process the footer - let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE); - let buffer = object_store - .read_with(path) - .range(buffer_start..file_size) - .await - .context(error::OpenDalSnafu)? - .to_vec(); - let buffer_len = buffer.len(); - - let mut footer = [0; 8]; - footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]); - - let footer_tail = ParquetMetaDataReader::decode_footer_tail(&footer).map_err(|e| { - error::InvalidParquetSnafu { - file: path, - reason: format!("failed to decode footer, {e}"), - } - .build() - })?; - let metadata_len = footer_tail.metadata_length() as u64; - - if file_size - (FOOTER_SIZE as u64) < metadata_len { - return error::InvalidParquetSnafu { - file: path, - reason: format!( - "the sum of Metadata length {} and Footer size {} is larger than file size {}", - metadata_len, FOOTER_SIZE, file_size - ), - } - .fail(); - } - - if (metadata_len as usize) <= buffer_len - FOOTER_SIZE { - // The whole metadata is in the first read - let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE; - let metadata = ParquetMetaDataReader::decode_metadata( - &buffer[metadata_start..buffer_len - FOOTER_SIZE], - ) - .map_err(|e| { - error::InvalidParquetSnafu { - file: path, - reason: format!("failed to decode metadata, {e}"), - } - .build() - })?; - Ok(metadata) - } else { - // The metadata is out of buffer, need to make a second read - let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64; - let data = object_store - .read_with(path) - .range(metadata_start..(file_size - FOOTER_SIZE as u64)) - .await - .context(error::OpenDalSnafu)? - .to_vec(); - - let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| { - error::InvalidParquetSnafu { - file: path, - reason: format!("failed to decode metadata, {e}"), - } - .build() - })?; - Ok(metadata) - } - } - - /// Get the size of parquet file. + /// Get the size of parquet file. If file_size is 0, stat the object store to get the size. async fn get_file_size(&self) -> Result { let file_size = match self.file_size { 0 => self @@ -163,4 +64,55 @@ impl<'a> MetadataLoader<'a> { }; Ok(file_size) } + + pub async fn load(&self) -> Result { + let path = self.file_path; + let file_size = self.get_file_size().await?; + let reader = + ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize)); + + let fetch = ObjectStoreFetch { + object_store: &self.object_store, + file_path: self.file_path, + }; + + reader + .load_and_finish(fetch, file_size) + .await + .map_err(|e| match unbox_external_error(e) { + Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err), + Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err), + }) + } +} + +/// Unpack ParquetError to get object_store::Error if possible. +fn unbox_external_error(e: ParquetError) -> StdResult { + match e { + ParquetError::External(boxed_err) => match boxed_err.downcast::() { + Ok(os_err) => Ok(*os_err), + Err(parquet_error) => Err(ParquetError::External(parquet_error)), + }, + other => Err(other), + } +} + +struct ObjectStoreFetch<'a> { + object_store: &'a ObjectStore, + file_path: &'a str, +} + +impl MetadataFetch for ObjectStoreFetch<'_> { + fn fetch(&mut self, range: std::ops::Range) -> BoxFuture<'_, ParquetResult> { + async move { + let data = self + .object_store + .read_with(self.file_path) + .range(range) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok(data.to_bytes()) + } + .boxed() + } }