refactor: read parquet metadata (#3199)

* feat: MetadataLoader

* refactor code

* chore: clippy

* chore: cr comment

* chore: add TODO

* chore: cr comment

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: clippy

---------

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Wei
2024-01-21 15:21:29 +08:00
committed by GitHub
parent 4278c858f3
commit e5a8831fa0
3 changed files with 170 additions and 17 deletions

View File

@@ -16,6 +16,7 @@
mod format;
pub(crate) mod helper;
mod metadata;
mod page_reader;
pub mod reader;
pub mod row_group;

View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use object_store::ObjectStore;
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::FOOTER_SIZE;
use snafu::ResultExt;
use crate::error::{self, Result};
/// The estimated size of the footer and metadata need to read from the end of parquet file.
const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
/// Load the metadata of parquet file in an async way.
pub(crate) struct MetadataLoader<'a> {
// An object store that supports async read
object_store: ObjectStore,
// The path of parquet file
file_path: &'a str,
// The size of parquet file
file_size: u64,
}
impl<'a> MetadataLoader<'a> {
/// Create a new parquet metadata loader.
pub fn new(object_store: ObjectStore, file_path: &'a str, file_size: u64) -> MetadataLoader {
Self {
object_store,
file_path,
file_size,
}
}
/// Async load the metadata of parquet file.
///
/// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
/// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
///
/// Parquet File Format:
/// ```text
/// ┌───────────────────────────────────┐
/// |4-byte magic number "PAR1" |
/// |───────────────────────────────────|
/// |Column 1 Chunk 1 + Column Metadata |
/// |Column 2 Chunk 1 + Column Metadata |
/// |... |
/// |Column N Chunk M + Column Metadata |
/// |───────────────────────────────────|
/// |File Metadata |
/// |───────────────────────────────────|
/// |4-byte length of file metadata |
/// |4-byte magic number "PAR1" |
/// └───────────────────────────────────┘
/// ```
///
/// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
pub async fn load(&self) -> Result<ParquetMetaData> {
let object_store = &self.object_store;
let path = self.file_path;
let file_size = self.get_file_size().await?;
if file_size < FOOTER_SIZE as u64 {
return error::InvalidParquetSnafu {
file: path,
reason: "file size is smaller than footer size",
}
.fail();
}
// Prefetch bytes for metadata from the end and process the footer
let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
let buffer = object_store
.read_with(path)
.range(buffer_start..file_size)
.await
.context(error::OpenDalSnafu)?;
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let metadata_len = decode_footer(&footer).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode footer, {e}"),
}
.build()
})? as u64;
if file_size - (FOOTER_SIZE as u64) < metadata_len {
return error::InvalidParquetSnafu {
file: path,
reason: format!(
"the sum of Metadata length {} and Footer size {} is larger than file size {}",
metadata_len, FOOTER_SIZE, file_size
),
}
.fail();
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
// The whole metadata is in the first read
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let metadata = decode_metadata(&buffer[metadata_start..buffer_len - FOOTER_SIZE])
.map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
Ok(metadata)
} else {
// The metadata is out of buffer, need to make a second read
let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(file_size - FOOTER_SIZE as u64))
.await
.context(error::OpenDalSnafu)?;
let metadata = decode_metadata(&data).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
Ok(metadata)
}
}
/// Get the size of parquet file.
async fn get_file_size(&self) -> Result<u64> {
let file_size = match self.file_size {
0 => self
.object_store
.stat(self.file_path)
.await
.context(error::OpenDalSnafu)?
.content_length(),
other => other,
};
Ok(file_size)
}
}

View File

@@ -29,7 +29,6 @@ use datafusion_common::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
@@ -37,12 +36,11 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::cache::CacheManagerRef;
use crate::error::{
ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu,
InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
InvalidParquetSnafu, ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
@@ -52,6 +50,7 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -136,15 +135,9 @@ impl ParquetReaderBuilder {
let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir);
// Now we create a reader to read the whole file.
let reader = self
.object_store
.reader(&file_path)
.await
.context(OpenDalSnafu)?;
let mut reader = BufReader::new(reader);
let file_size = self.file_handle.meta().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?;
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
@@ -247,8 +240,8 @@ impl ParquetReaderBuilder {
/// Reads parquet metadata of specific file.
async fn read_parquet_metadata(
&self,
reader: &mut impl AsyncFileReader,
file_path: &str,
file_size: u64,
) -> Result<Arc<ParquetMetaData>> {
// Tries to get from global cache.
if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| {
@@ -257,11 +250,12 @@ impl ParquetReaderBuilder {
return Ok(metadata);
}
// Cache miss, get from the reader.
let metadata = reader
.get_metadata()
.await
.context(ReadParquetSnafu { path: file_path })?;
// TODO(QuenKar): should also check write cache to get parquet metadata.
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
if let Some(cache) = &self.cache_manager {
cache.put_parquet_meta_data(