feat(mito): Reads SST's row groups one by one (#2668)

* feat: read parquet metadata

* feat: add create method for row group

* feat: read parquet by row group

* refactor: use VecDeque to collect batches

* style: fix row group clippy warnings

* chore: update comments

* style: fix clippy

* refactor: simplify row group reader builder

* docs: fix grammar issue

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Yingwen
2023-11-01 11:59:16 +08:00
committed by GitHub
parent 7bd137f398
commit 5f3bbdca4f
5 changed files with 242 additions and 185 deletions

View File

@@ -388,6 +388,14 @@ pub enum Error {
region_dir: String,
location: Location,
},
#[snafu(display("Failed to read arrow record batch from parquet file {}", path))]
ArrowReader {
path: String,
#[snafu(source)]
error: ArrowError,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -458,6 +466,7 @@ impl ErrorExt for Error {
RegionReadonly { .. } => StatusCode::RegionReadonly,
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -173,6 +173,7 @@ impl ScanRegion {
);
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,

View File

@@ -26,7 +26,7 @@
//!
//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use api::v1::SemanticType;
@@ -138,7 +138,10 @@ impl ReadFormat {
}
}
/// Gets the converted arrow schema.
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
/// as the arrow schema decoded from the file metadata.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
@@ -178,7 +181,7 @@ impl ReadFormat {
pub(crate) fn convert_record_batch(
&self,
record_batch: &RecordBatch,
batches: &mut Vec<Batch>,
batches: &mut VecDeque<Batch>,
) -> Result<()> {
debug_assert!(batches.is_empty());
@@ -249,7 +252,7 @@ impl ReadFormat {
}
let batch = builder.build()?;
batches.push(batch);
batches.push_back(batch);
}
Ok(())
@@ -768,7 +771,7 @@ mod tests {
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
let mut batches = vec![];
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&record_batch, &mut batches)
.unwrap();
@@ -790,14 +793,14 @@ mod tests {
];
let arrow_schema = build_test_arrow_schema();
let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
let mut batches = vec![];
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&record_batch, &mut batches)
.unwrap();
assert_eq!(
vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
batches
batches.into_iter().collect::<Vec<_>>(),
);
}
}

View File

@@ -14,37 +14,34 @@
//! Parquet reader.
use std::collections::HashSet;
use std::ops::Range;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use async_compat::CompatExt;
use async_compat::{Compat, CompatExt};
use async_trait::async_trait;
use bytes::Bytes;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{FutureExt, TryStreamExt};
use object_store::ObjectStore;
use object_store::{ObjectStore, Reader};
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::ParquetError;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::cache::CacheManagerRef;
use crate::error::{
InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu,
Result,
};
use crate::read::{Batch, BatchReader};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -114,46 +111,22 @@ impl ParquetReaderBuilder {
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
pub async fn build(self) -> Result<ParquetReader> {
pub async fn build(&self) -> Result<ParquetReader> {
let file_path = self.file_handle.file_path(&self.file_dir);
let (stream, read_format) = self.init_stream(&file_path).await?;
Ok(ParquetReader {
file_path,
_file_handle: self.file_handle,
stream,
read_format,
batches: Vec::new(),
})
}
/// Initializes the parquet stream, also creates a [ReadFormat] to decode record batches.
async fn init_stream(&self, file_path: &str) -> Result<(BoxedRecordBatchStream, ReadFormat)> {
// Creates parquet stream builder.
// Now we create a reader to read the whole file.
let reader = self
.object_store
.reader(file_path)
.reader(&file_path)
.await
.context(OpenDalSnafu)?
.compat();
let reader = BufReader::new(reader);
let reader = AsyncFileReaderCache {
reader,
// TODO(yingwen): Sets the metadata when we implement row group level reader.
metadata: None,
cache: self.cache_manager.clone(),
region_id: self.file_handle.region_id(),
file_id: self.file_handle.file_id(),
};
let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
.await
.context(ReadParquetSnafu { path: file_path })?
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
// Decode region metadata.
let key_value_meta = builder.metadata().file_metadata().key_value_metadata();
let region_meta = self.get_region_metadata(file_path, key_value_meta)?;
let mut reader = BufReader::new(reader);
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
// Computes column ids to read.
let column_ids: HashSet<_> = self
.projection
.as_ref()
@@ -165,56 +138,59 @@ impl ParquetReaderBuilder {
.map(|c| c.column_id)
.collect()
});
let read_format = ReadFormat::new(Arc::new(region_meta));
// The arrow schema converted from the region meta should be the same as parquet's.
// We only compare fields to avoid schema's metadata breaks the comparison.
ensure!(
read_format.arrow_schema().fields() == builder.schema().fields(),
InvalidParquetSnafu {
file: file_path,
reason: format!(
"schema mismatch, expect: {:?}, given: {:?}",
read_format.arrow_schema().fields(),
builder.schema().fields()
)
}
);
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
let stats = RowGroupPruningStats::new(
builder.metadata().row_groups(),
&read_format,
column_ids,
);
// Prunes row groups by metadata.
let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate {
let stats =
RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids);
let pruned_row_groups = predicate
predicate
.prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
.collect()
} else {
(0..parquet_meta.num_row_groups()).collect()
};
let parquet_schema_desc = builder.metadata().file_metadata().schema_descr();
if let Some(column_ids) = self.projection.as_ref() {
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let projection_mask = if let Some(column_ids) = self.projection.as_ref() {
let indices = read_format.projection_indices(column_ids.iter().copied());
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices);
builder = builder.with_projection(projection_mask);
}
// Now we assumes we don't have nested schemas.
ProjectionMask::roots(parquet_schema_desc, indices)
} else {
ProjectionMask::all()
};
let stream = builder
.build()
.context(ReadParquetSnafu { path: file_path })?;
// Computes the field levels.
let hint = Some(read_format.arrow_schema().fields());
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;
Ok((Box::pin(stream), read_format))
let reader_builder = RowGroupReaderBuilder {
file_path,
parquet_meta,
file_reader: reader,
projection: projection_mask,
field_levels,
};
Ok(ParquetReader {
_file_handle: self.file_handle.clone(),
row_groups,
read_format,
reader_builder,
current_reader: None,
batches: VecDeque::new(),
})
}
/// Decode region metadata from key value.
/// Decodes region metadata from key value.
fn get_region_metadata(
&self,
file_path: &str,
key_value_meta: Option<&Vec<KeyValue>>,
) -> Result<RegionMetadata> {
@@ -239,49 +215,119 @@ impl ParquetReaderBuilder {
RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
}
/// Reads parquet metadata of specific file.
async fn read_parquet_metadata(
&self,
reader: &mut impl AsyncFileReader,
file_path: &str,
) -> Result<Arc<ParquetMetaData>> {
// Tries to get from global cache.
if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| {
cache.get_parquet_meta_data(self.file_handle.region_id(), self.file_handle.file_id())
}) {
return Ok(metadata);
}
// Cache miss, get from the reader.
let metadata = reader
.get_metadata()
.await
.context(ReadParquetSnafu { path: file_path })?;
// Cache the metadata.
if let Some(cache) = &self.cache_manager {
cache.put_parquet_meta_data(
self.file_handle.region_id(),
self.file_handle.file_id(),
metadata.clone(),
);
}
Ok(metadata)
}
}
type BoxedRecordBatchStream = BoxStream<'static, std::result::Result<RecordBatch, ParquetError>>;
/// Builder to build a [ParquetRecordBatchReader] for a row group.
struct RowGroupReaderBuilder {
/// Path of the file.
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Reader to get data.
file_reader: BufReader<Compat<Reader>>,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
field_levels: FieldLevels,
}
impl RowGroupReaderBuilder {
/// Path of the file to read.
fn file_path(&self) -> &str {
&self.file_path
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
async fn build(&mut self, row_group_idx: usize) -> Result<ParquetRecordBatchReader> {
let mut row_group = InMemoryRowGroup::create(&self.parquet_meta, row_group_idx);
// Fetches data into memory.
row_group
.fetch(&mut self.file_reader, &self.projection, None)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
&self.field_levels,
&row_group,
DEFAULT_READ_BATCH_SIZE,
None,
)
.context(ReadParquetSnafu {
path: &self.file_path,
})
}
}
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Path of the file.
file_path: String,
/// SST file to read.
///
/// Holds the file handle to avoid the file purge purge it.
_file_handle: FileHandle,
/// Inner parquet record batch stream.
stream: BoxedRecordBatchStream,
/// Indices of row groups to read.
row_groups: VecDeque<usize>,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
read_format: ReadFormat,
/// Builder to build row group readers.
reader_builder: RowGroupReaderBuilder,
/// Reader of current row group.
current_reader: Option<ParquetRecordBatchReader>,
/// Buffered batches to return.
batches: Vec<Batch>,
batches: VecDeque<Batch>,
}
#[async_trait]
impl BatchReader for ParquetReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if let Some(batch) = self.batches.pop() {
if let Some(batch) = self.batches.pop_front() {
return Ok(Some(batch));
}
// We need to fetch next record batch and convert it to batches.
let Some(record_batch) = self.stream.try_next().await.context(ReadParquetSnafu {
path: &self.file_path,
})?
else {
let Some(record_batch) = self.fetch_next_record_batch().await? else {
return Ok(None);
};
self.read_format
.convert_record_batch(&record_batch, &mut self.batches)?;
// Reverse batches so we could pop it.
self.batches.reverse();
Ok(self.batches.pop())
Ok(self.batches.pop_front())
}
}
@@ -290,59 +336,43 @@ impl ParquetReader {
pub fn metadata(&self) -> &RegionMetadataRef {
self.read_format.metadata()
}
}
/// Cache layer for parquet's [AsyncFileReader].
struct AsyncFileReaderCache<T> {
/// Underlying async file reader.
reader: T,
/// Parquet metadata cached locally.
metadata: Option<Arc<ParquetMetaData>>,
/// Global cache.
cache: Option<CacheManagerRef>,
/// Id of the region.
region_id: RegionId,
/// Id of the file to read.
file_id: FileId,
}
impl<T: AsyncFileReader> AsyncFileReader for AsyncFileReaderCache<T> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
self.reader.get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, ParquetError>> {
self.reader.get_byte_ranges(ranges)
}
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>, ParquetError>> {
async {
// Tries to get from local cache.
if let Some(metadata) = &self.metadata {
return Ok(metadata.clone());
}
// Tries to get from global cache.
if let Some(metadata) = self
.cache
.as_ref()
.and_then(|cache| cache.get_parquet_meta_data(self.region_id, self.file_id))
/// Tries to fetch next [RecordBatch] from the reader.
///
/// If the reader is exhausted, reads next row group.
async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if let Some(row_group_reader) = &mut self.current_reader {
if let Some(record_batch) =
row_group_reader
.next()
.transpose()
.context(ArrowReaderSnafu {
path: self.reader_builder.file_path(),
})?
{
return Ok(metadata);
return Ok(Some(record_batch));
}
// Cache miss.
let metadata = self.reader.get_metadata().await?;
// Cache the metadata.
if let Some(cache) = &self.cache {
cache.put_parquet_meta_data(self.region_id, self.file_id, metadata.clone());
}
Ok(metadata)
}
.boxed()
// No more items in current row group, reads next row group.
while let Some(row_group_idx) = self.row_groups.pop_front() {
let mut row_group_reader = self.reader_builder.build(row_group_idx).await?;
let Some(record_batch) =
row_group_reader
.next()
.transpose()
.context(ArrowReaderSnafu {
path: self.reader_builder.file_path(),
})?
else {
continue;
};
// Sets current reader to this reader.
self.current_reader = Some(row_group_reader);
return Ok(Some(record_batch));
}
Ok(None)
}
}

View File

@@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;
@@ -36,10 +36,25 @@ pub struct InMemoryRowGroup<'a> {
}
impl<'a> InMemoryRowGroup<'a> {
/// Creates a new [InMemoryRowGroup] by `row_group_idx`.
///
/// # Panics
/// Panics if the `row_group_idx` is invalid.
pub fn create(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
let page_locations = parquet_meta
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Self {
metadata,
row_count: metadata.num_rows() as usize,
column_chunks: vec![None; metadata.columns().len()],
page_locations,
}
}
/// Fetches the necessary column data into memory
// TODO(yingwen): Fix clippy warnings.
#[allow(clippy::filter_map_bool_then)]
#[allow(clippy::useless_conversion)]
pub async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
@@ -56,26 +71,26 @@ impl<'a> InMemoryRowGroup<'a> {
.iter()
.zip(self.metadata.columns())
.enumerate()
.filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}
ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
ranges
})
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}
ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
ranges
})
.flatten()
.collect();
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
@@ -94,7 +109,7 @@ impl<'a> InMemoryRowGroup<'a> {
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
data: offsets.into_iter().zip(chunks).collect(),
}))
}
}
@@ -103,12 +118,11 @@ impl<'a> InMemoryRowGroup<'a> {
.column_chunks
.iter()
.enumerate()
.filter_map(|(idx, chunk)| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
.collect();