feat: support fetch ranges in concurrent (#2959)

* feat: concurrent fetch ranges

* chore: cr comment

* chore: cr comment

* chore: clippy
This commit is contained in:
Wei
2023-12-22 16:47:55 +08:00
committed by GitHub
parent 830a91c548
commit a7349b573b
2 changed files with 109 additions and 11 deletions

View File

@@ -22,7 +22,7 @@ use async_trait::async_trait;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::{ObjectStore, Reader};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
@@ -178,7 +178,7 @@ impl ParquetReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
parquet_meta,
file_reader: reader,
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_manager: self.cache_manager.clone(),
@@ -285,8 +285,8 @@ struct RowGroupReaderBuilder {
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Reader to get data.
file_reader: BufReader<Reader>,
/// Object store as an Operator.
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
@@ -309,10 +309,12 @@ impl RowGroupReaderBuilder {
&self.parquet_meta,
row_group_idx,
self.cache_manager.clone(),
&self.file_path,
self.object_store.clone(),
);
// Fetches data into memory.
row_group
.fetch(&mut self.file_reader, &self.projection, None)
.fetch(&self.projection, None)
.await
.context(ReadParquetSnafu {
path: &self.file_path,

View File

@@ -14,11 +14,12 @@
//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
use std::ops::Range;
use std::sync::Arc;
use bytes::{Buf, Bytes};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
@@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> {
///
/// `column_cached_pages.len()` equals to `column_chunks.len()`.
column_cached_pages: Vec<Option<Arc<PageValue>>>,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
}
impl<'a> InMemoryRowGroup<'a> {
@@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> {
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
@@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> {
row_group_idx,
cache_manager,
column_cached_pages: vec![None; metadata.columns().len()],
file_path,
object_store,
}
}
/// Fetches the necessary column data into memory
pub async fn fetch<T: AsyncFileReader + Send>(
pub async fn fetch(
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
@@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> {
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
let fetch_ranges = self
let fetch_ranges: Vec<_> = self
.column_chunks
.iter()
.zip(self.metadata.columns())
@@ -119,8 +126,11 @@ impl<'a> InMemoryRowGroup<'a> {
ranges
})
.collect();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
@@ -165,7 +175,10 @@ impl<'a> InMemoryRowGroup<'a> {
return Ok(());
}
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();
for (idx, (chunk, cached_pages)) in self
.column_chunks
@@ -336,3 +349,86 @@ impl Iterator for ColumnChunkIterator {
}
impl PageIterator for ColumnChunkIterator {}
/// Fetches data from object store.
/// If the object store supports blocking, use sequence blocking read.
/// Otherwise, use concurrent read.
async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<usize>>,
) -> Result<Vec<Bytes>> {
let ranges: Vec<_> = ranges
.iter()
.map(|range| range.start as u64..range.end as u64)
.collect();
if object_store.info().full_capability().blocking {
fetch_ranges_seq(file_path, object_store, ranges).await
} else {
fetch_ranges_concurrent(file_path, object_store, ranges).await
}
}
/// Fetches data from object store sequentially
async fn fetch_ranges_seq(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
let block_object_store = object_store.blocking();
let file_path = file_path.to_string();
let f = move || -> Result<Vec<Bytes>> {
ranges
.into_iter()
.map(|range| {
let data = block_object_store
.read_with(&file_path)
.range(range)
.call()
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
})
.collect::<Result<Vec<_>>>()
};
maybe_spawn_blocking(f).await
}
/// Fetches data from object store concurrently.
async fn fetch_ranges_concurrent(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
// TODO(QuenKar): may merge small ranges to a bigger range to optimize.
let mut handles = Vec::with_capacity(ranges.len());
for range in ranges {
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read
.range(range.start..range.end)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
});
}
let results = futures::future::try_join_all(handles).await?;
Ok(results)
}
// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83
/// Takes a function and spawns it to a tokio blocking pool if available
pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(f)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?,
Err(_) => f(),
}
}