feat(mito): adjust seg size of inverted index to finer granularity instead of row group level (#3289)

* feat(mito): adjust seg size of inverted index to finer granularity instead of row group level

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: wrong metric

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: more suitable name

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: BitVec instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-02-07 16:20:00 +08:00
committed by GitHub
parent e5ec65988b
commit 141ed51dcc
14 changed files with 408 additions and 116 deletions

1
Cargo.lock generated
View File

@@ -5299,6 +5299,7 @@ dependencies = [
"futures",
"humantime-serde",
"index",
"itertools 0.10.5",
"lazy_static",
"log-store",
"memcomparable",

View File

@@ -14,14 +14,26 @@
mod predicates_apply;
use std::collections::BTreeSet;
use async_trait::async_trait;
use common_base::BitVec;
pub use predicates_apply::PredicatesIndexApplier;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
/// The output of an apply operation.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ApplyOutput {
/// Bitmap of indices that match the predicates.
pub matched_segment_ids: BitVec,
/// The total number of rows in the index.
pub total_row_count: usize,
/// The number of rows in each segment.
pub segment_row_count: usize,
}
/// A trait for processing and transforming indices obtained from an inverted index.
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
@@ -35,7 +47,7 @@ pub trait IndexApplier: Send + Sync {
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>>;
) -> Result<ApplyOutput>;
/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::mem::size_of;
use async_trait::async_trait;
@@ -26,7 +25,7 @@ use crate::inverted_index::search::fst_apply::{
};
use crate::inverted_index::search::fst_values_mapper::FstValuesMapper;
use crate::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use crate::inverted_index::search::predicate::Predicate;
@@ -48,8 +47,13 @@ impl IndexApplier for PredicatesIndexApplier {
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>> {
) -> Result<ApplyOutput> {
let metadata = reader.metadata().await?;
let mut output = ApplyOutput {
matched_segment_ids: BitVec::EMPTY,
total_row_count: metadata.total_row_count as _,
segment_row_count: metadata.segment_row_count as _,
};
let mut bitmap = Self::bitmap_full_range(&metadata);
// TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
@@ -61,7 +65,7 @@ impl IndexApplier for PredicatesIndexApplier {
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(BTreeSet::default());
return Ok(output);
}
IndexNotFoundStrategy::Ignore => {
continue;
@@ -81,7 +85,8 @@ impl IndexApplier for PredicatesIndexApplier {
bitmap &= bm;
}
Ok(bitmap.iter_ones().collect())
output.matched_segment_ids = bitmap;
Ok(output)
}
/// Returns the memory usage of the applier.
@@ -206,11 +211,14 @@ mod tests {
_ => unreachable!(),
}
});
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6]));
assert_eq!(
output.matched_segment_ids,
bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]
);
// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
@@ -223,11 +231,11 @@ mod tests {
"tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap()),
_ => unreachable!(),
});
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
assert_eq!(output.matched_segment_ids.count_ones(), 0);
}
#[tokio::test]
@@ -260,11 +268,14 @@ mod tests {
}
});
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 4, 6]));
assert_eq!(
output.matched_segment_ids,
bitvec![u8, Lsb0; 1, 0, 0, 0, 1, 0, 1, 0]
);
}
#[tokio::test]
@@ -278,11 +289,14 @@ mod tests {
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0"])));
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan
assert_eq!(
output.matched_segment_ids,
bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]
); // full range to scan
}
#[tokio::test]
@@ -303,11 +317,11 @@ mod tests {
fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
};
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
assert!(output.matched_segment_ids.is_empty());
}
#[tokio::test]
@@ -334,7 +348,7 @@ mod tests {
.await;
assert!(matches!(result, Err(Error::IndexNotFound { .. })));
let indices = applier
let output = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
@@ -343,9 +357,9 @@ mod tests {
)
.await
.unwrap();
assert!(indices.is_empty());
assert!(output.matched_segment_ids.is_empty());
let indices = applier
let output = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
@@ -354,7 +368,10 @@ mod tests {
)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7]));
assert_eq!(
output.matched_segment_ids,
bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]
);
}
#[test]

View File

@@ -41,6 +41,7 @@ datatypes.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"

View File

@@ -139,6 +139,7 @@ impl AccessLayer {
file_id,
file_path: index_file_path,
metadata: &request.metadata,
segment_row_count: write_opts.index_segment_row_count,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),

View File

@@ -114,6 +114,7 @@ impl WriteCache {
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
segment_row_count: write_opts.index_segment_row_count,
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
intermediate_manager: self.intermediate_manager.clone(),

View File

@@ -551,10 +551,10 @@ async fn test_region_usage() {
let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 3006);
assert_eq!(region_stat.sst_usage, 3005);
// region total usage
assert_eq!(region_stat.disk_usage(), 4072);
assert_eq!(region_stat.disk_usage(), 4071);
}
#[tokio::test]

View File

@@ -132,6 +132,8 @@ lazy_static! {
/// Counter of filtered rows by precise filter.
pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.
// Cache related metrics.

View File

@@ -129,6 +129,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) segment_row_count: usize,
pub(crate) object_store: ObjectStore,
pub(crate) intermediate_manager: IntermediateManager,
}
@@ -153,6 +154,14 @@ impl<'a> IndexerBuilder<'a> {
return Indexer::default();
}
let Some(mut segment_row_count) = NonZeroUsize::new(self.segment_row_count) else {
warn!(
"Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
};
let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
warn!(
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
@@ -161,6 +170,11 @@ impl<'a> IndexerBuilder<'a> {
return Indexer::default();
};
// if segment row count not aligned with row group size, adjust it to be aligned.
if row_group_size.get() % segment_row_count.get() != 0 {
segment_row_count = row_group_size;
}
let creator = SstIndexCreator::new(
self.file_path,
self.file_id,
@@ -168,7 +182,7 @@ impl<'a> IndexerBuilder<'a> {
self.object_store,
self.intermediate_manager,
self.mem_threshold_index_create,
row_group_size,
segment_row_count,
)
.with_buffer_size(self.write_buffer_size);
@@ -263,6 +277,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
@@ -282,6 +297,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
@@ -301,6 +317,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
@@ -320,6 +337,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 0,
row_group_size: 0,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),

View File

@@ -14,13 +14,12 @@
pub mod builder;
use std::collections::BTreeSet;
use std::sync::Arc;
use futures::{AsyncRead, AsyncSeek};
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
@@ -84,7 +83,7 @@ impl SstIndexApplier {
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<usize>> {
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED.start_timer();
let context = SearchContext {
@@ -175,6 +174,7 @@ impl Drop for SstIndexApplier {
#[cfg(test)]
mod tests {
use common_base::BitVec;
use futures::io::Cursor;
use index::inverted_index::search::index_apply::MockIndexApplier;
use object_store::services::Memory;
@@ -203,9 +203,13 @@ mod tests {
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier
.expect_apply()
.returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3])));
mock_index_applier.expect_apply().returning(|_, _| {
Ok(ApplyOutput {
matched_segment_ids: BitVec::EMPTY,
total_row_count: 100,
segment_row_count: 10,
})
});
let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
@@ -214,8 +218,15 @@ mod tests {
None,
Box::new(mock_index_applier),
);
let ids = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(ids, BTreeSet::from_iter([1, 2, 3]));
let output = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(
output,
ApplyOutput {
matched_segment_ids: BitVec::EMPTY,
total_row_count: 100,
segment_row_count: 10,
}
);
}
#[tokio::test]

View File

@@ -84,7 +84,7 @@ impl SstIndexCreator {
index_store: ObjectStore,
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
segment_row_count: NonZeroUsize,
) -> Self {
// `memory_usage_threshold` is the total memory usage threshold of the index creation,
// so we need to divide it by the number of columns
@@ -96,7 +96,7 @@ impl SstIndexCreator {
intermediate_manager,
));
let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold);
let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size));
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {

View File

@@ -20,6 +20,7 @@ pub(crate) mod metadata;
mod page_reader;
pub mod reader;
pub mod row_group;
mod row_selection;
mod stats;
pub mod writer;
@@ -38,6 +39,8 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
/// Default segment row count for inverted index.
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
/// Parquet write options.
#[derive(Debug)]
@@ -46,6 +49,8 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
/// Segment row count for inverted index.
pub index_segment_row_count: usize,
}
impl Default for WriteOptions {
@@ -53,6 +58,7 @@ impl Default for WriteOptions {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
index_segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
}
}
}

View File

@@ -14,7 +14,7 @@
//! Parquet reader.
use std::collections::{BTreeSet, VecDeque};
use std::collections::{BTreeMap, VecDeque};
use std::ops::BitAnd;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -27,8 +27,9 @@ use common_time::range::TimestampRange;
use datafusion_common::arrow::array::BooleanArray;
use datafusion_common::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
@@ -43,7 +44,8 @@ use crate::error::{
InvalidParquetSnafu, ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -52,6 +54,7 @@ use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::row_selection_from_row_ranges;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -161,7 +164,6 @@ impl ParquetReaderBuilder {
let mut metrics = Metrics::default();
// Computes row groups to read.
let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
.await;
@@ -265,85 +267,162 @@ impl ParquetReaderBuilder {
Ok(metadata)
}
/// Computes row groups to read.
/// Computes row groups to read, along with their respective row selections.
async fn row_groups_to_read(
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
) -> BTreeSet<usize> {
let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect();
metrics.num_row_groups_unfiltered += row_group_ids.len();
// Applies index to prune row groups.
//
// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
// as an escape route in case of index issues, and it can be used to test
// the correctness of the index.
if let Some(index_applier) = &self.index_applier {
if self.file_handle.meta().inverted_index_available() {
match index_applier.apply(self.file_handle.file_id()).await {
Ok(row_groups) => row_group_ids = row_groups,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
}
}
}
) -> BTreeMap<usize, Option<RowSelection>> {
let num_row_groups = parquet_meta.num_row_groups();
if num_row_groups == 0 {
return BTreeMap::default();
}
metrics.num_row_groups_inverted_index_selected += row_group_ids.len();
metrics.num_row_groups_before_filtering += num_row_groups;
if row_group_ids.is_empty() {
return row_group_ids;
}
self.prune_row_groups_by_inverted_index(parquet_meta, metrics)
.await
.or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics))
.unwrap_or_else(|| (0..num_row_groups).map(|i| (i, None)).collect())
}
// Prunes row groups by min-max index.
if let Some(predicate) = &self.predicate {
let region_meta = read_format.metadata();
let column_ids = region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let row_groups = row_group_ids
.iter()
.map(|id| parquet_meta.row_group(*id))
.collect::<Vec<_>>();
let stats = RowGroupPruningStats::new(&row_groups, read_format, column_ids);
let mut mask = predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.into_iter();
row_group_ids.retain(|_| mask.next().unwrap_or(false));
/// Applies index to prune row groups.
///
/// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
/// as an escape route in case of index issues, and it can be used to test
/// the correctness of the index.
async fn prune_row_groups_by_inverted_index(
&self,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let Some(index_applier) = &self.index_applier else {
return None;
};
metrics.num_row_groups_min_max_selected += row_group_ids.len();
row_group_ids
if !self.file_handle.meta().inverted_index_available() {
return None;
}
let output = match index_applier.apply(self.file_handle.file_id()).await {
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
return None;
}
};
// Let's assume that the number of rows in the first row group
// can represent the `row_group_size` of the Parquet file.
//
// If the file contains only one row group, i.e. the number of rows
// less than the `row_group_size`, the calculation of `row_group_id`
// and `rg_begin_row_id` is still correct.
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
if row_group_size == 0 {
return None;
}
let segment_row_count = output.segment_row_count;
let row_groups = output
.matched_segment_ids
.iter_ones()
.map(|seg_id| {
let begin_row_id = seg_id * segment_row_count;
let row_group_id = begin_row_id / row_group_size;
let rg_begin_row_id = begin_row_id % row_group_size;
let rg_end_row_id = rg_begin_row_id + segment_row_count;
(row_group_id, rg_begin_row_id..rg_end_row_id)
})
.group_by(|(row_group_id, _)| *row_group_id)
.into_iter()
.map(|(row_group_id, group)| {
let row_ranges = group.map(|(_, range)| range);
let total_row_count = parquet_meta.row_group(row_group_id).num_rows() as usize;
let (row_selection, skipped) =
row_selection_from_row_ranges(row_ranges, total_row_count);
metrics.num_rows_in_row_group_before_filtering += total_row_count;
metrics.num_rows_in_row_group_inverted_index_filtered += skipped;
(row_group_id, Some(row_selection))
})
.collect::<BTreeMap<_, _>>();
let filtered = parquet_meta.num_row_groups() - row_groups.len();
metrics.num_row_groups_inverted_index_filtered += filtered;
Some(row_groups)
}
/// Prunes row groups by min-max index.
fn prune_row_groups_by_minmax(
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let Some(predicate) = &self.predicate else {
return None;
};
let num_row_groups = parquet_meta.num_row_groups();
let region_meta = read_format.metadata();
let column_ids = region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();
let row_groups = parquet_meta.row_groups();
let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids);
let row_groups = predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.iter()
.zip(0..num_row_groups)
.filter(|&(mask, _)| *mask)
.map(|(_, id)| (id, None))
.collect::<BTreeMap<_, _>>();
let filtered = num_row_groups - row_groups.len();
metrics.num_row_groups_min_max_filtered += filtered;
Some(row_groups)
}
}
/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
/// Number of unfiltered row groups.
num_row_groups_unfiltered: usize,
/// Number of row groups to read after filtering by inverted index.
num_row_groups_inverted_index_selected: usize,
/// Number of row groups to read after filtering by min-max index.
num_row_groups_min_max_selected: usize,
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
/// Number of row groups filtered by inverted index.
num_row_groups_inverted_index_filtered: usize,
/// Number of row groups filtered by min-max index.
num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
num_rows_precise_filtered: usize,
/// Number of rows in row group before filtering.
num_rows_in_row_group_before_filtering: usize,
/// Number of rows in row group filtered by inverted index.
num_rows_in_row_group_inverted_index_filtered: usize,
/// Duration to build the parquet reader.
build_cost: Duration,
/// Duration to scan the reader.
@@ -383,7 +462,11 @@ impl RowGroupReaderBuilder {
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
async fn build(&mut self, row_group_idx: usize) -> Result<ParquetRecordBatchReader> {
async fn build(
&mut self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> Result<ParquetRecordBatchReader> {
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id(),
@@ -395,7 +478,7 @@ impl RowGroupReaderBuilder {
);
// Fetches data into memory.
row_group
.fetch(&self.projection, None)
.fetch(&self.projection, row_selection.as_ref())
.await
.context(ReadParquetSnafu {
path: &self.file_path,
@@ -407,7 +490,7 @@ impl RowGroupReaderBuilder {
&self.field_levels,
&row_group,
DEFAULT_READ_BATCH_SIZE,
None,
row_selection,
)
.context(ReadParquetSnafu {
path: &self.file_path,
@@ -417,8 +500,8 @@ impl RowGroupReaderBuilder {
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Indices of row groups to read.
row_groups: BTreeSet<usize>,
/// Indices of row groups to read, along with their respective row selections.
row_groups: BTreeMap<usize, Option<RowSelection>>,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
@@ -477,8 +560,10 @@ impl Drop for ParquetReader {
self.reader_builder.file_handle.region_id(),
self.reader_builder.file_handle.file_id(),
self.reader_builder.file_handle.time_range(),
self.metrics.num_row_groups_min_max_selected,
self.metrics.num_row_groups_unfiltered,
self.metrics.num_row_groups_before_filtering
- self.metrics.num_row_groups_inverted_index_filtered
- self.metrics.num_row_groups_min_max_filtered,
self.metrics.num_row_groups_before_filtering,
self.metrics
);
@@ -493,17 +578,23 @@ impl Drop for ParquetReader {
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["unfiltered"])
.inc_by(self.metrics.num_row_groups_unfiltered as u64);
.with_label_values(&["before_filtering"])
.inc_by(self.metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_selected"])
.inc_by(self.metrics.num_row_groups_inverted_index_selected as u64);
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.metrics.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["min_max_index_selected"])
.inc_by(self.metrics.num_row_groups_min_max_selected as u64);
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.metrics.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.metrics.num_rows_in_row_group_inverted_index_filtered as u64);
}
}
@@ -531,8 +622,11 @@ impl ParquetReader {
}
// No more items in current row group, reads next row group.
while let Some(row_group_idx) = self.row_groups.pop_first() {
let mut row_group_reader = self.reader_builder.build(row_group_idx).await?;
while let Some((row_group_idx, row_selection)) = self.row_groups.pop_first() {
let mut row_group_reader = self
.reader_builder
.build(row_group_idx, row_selection)
.await?;
let Some(record_batch) =
row_group_reader
.next()

View File

@@ -0,0 +1,128 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
type SkipRowCount = usize;
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
/// Returns the `RowSelection` and the number of rows that were skipped.
///
/// This function processes each range in the input and either creates a new selector or merges
/// with the existing one, depending on whether the current range is contiguous with the preceding one
/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
/// optimizing the list of selectors by merging contiguous actions of the same type.
///
/// Note: overlapping ranges are not supported and will result in an incorrect selection.
pub(crate) fn row_selection_from_row_ranges(
row_ranges: impl Iterator<Item = Range<usize>>,
total_row_count: usize,
) -> (RowSelection, SkipRowCount) {
let mut selectors: Vec<RowSelector> = Vec::new();
let mut last_processed_end = 0;
let mut skip_row_count = 0;
for Range { start, end } in row_ranges {
if start > last_processed_end {
add_or_merge_selector(&mut selectors, start - last_processed_end, true);
skip_row_count += start - last_processed_end;
}
add_or_merge_selector(&mut selectors, end - start, false);
last_processed_end = end;
}
skip_row_count += total_row_count.saturating_sub(last_processed_end);
(RowSelection::from(selectors), skip_row_count)
}
/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
/// if they are of the same type (both skip or both select).
fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
if let Some(last) = selectors.last_mut() {
// Merge with last if both actions are same
if last.skip == is_skip {
last.row_count += count;
return;
}
}
// Add new selector otherwise
let new_selector = if is_skip {
RowSelector::skip(count)
} else {
RowSelector::select(count)
};
selectors.push(new_selector);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_contiguous_range() {
let (selection, skipped) = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
assert_eq!(selection, expected);
assert_eq!(skipped, 5);
}
#[test]
fn test_non_contiguous_ranges() {
let ranges = vec![1..3, 5..8];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(3),
]);
assert_eq!(selection, expected);
assert_eq!(skipped, 5);
}
#[test]
fn test_empty_range() {
let ranges = vec![];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![]);
assert_eq!(selection, expected);
assert_eq!(skipped, 10);
}
#[test]
fn test_adjacent_ranges() {
let ranges = vec![1..2, 2..3];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]);
assert_eq!(selection, expected);
assert_eq!(skipped, 8);
}
#[test]
fn test_large_gap_between_ranges() {
let ranges = vec![1..2, 100..101];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(98),
RowSelector::select(1),
]);
assert_eq!(selection, expected);
assert_eq!(skipped, 10238);
}
}