feat(parquet): introduce inverted index applier to reader (#3130)

* feat(parquet): introduce inverted index applier to reader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: purger removes index file

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add TODO for escape route

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add TODO for escape route

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/mito2/src/access_layer.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/mito2/src/sst/parquet/reader.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* feat: min-max index to prune row groups filtered by inverted index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: file_meta.inverted_index_available -> file_meta.available_indexes

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add TODO for leveraging WriteCache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: misset available indexes

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add index file size

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: use smallvec to reduce heap allocation

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: add index size to disk usage

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Zhenchi
2024-01-11 16:04:59 +08:00
committed by GitHub
parent 312e8e824e
commit fd8fb641fd
25 changed files with 315 additions and 90 deletions

3
Cargo.lock generated
View File

@@ -8716,6 +8716,9 @@ name = "smallvec"
version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
dependencies = [
"serde",
]
[[package]]
name = "smartstring"

View File

@@ -121,7 +121,7 @@ rskafka = "0.5"
rust_decimal = "1.33"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smallvec = "1"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [

View File

@@ -28,7 +28,7 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
pub trait IndexApplier {
pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply<'a>(

View File

@@ -22,9 +22,9 @@ use store_api::metadata::RegionMetadataRef;
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
@@ -66,13 +66,27 @@ impl AccessLayer {
&self.object_store
}
/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = location::sst_file_path(&self.region_dir, file_id);
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu { file_id })
.context(DeleteSstSnafu {
file_id: file_meta.file_id,
})?;
if file_meta.inverted_index_available() {
let path = location::index_file_path(&self.region_dir, file_meta.file_id);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: file_meta.file_id,
})?;
}
Ok(())
}
/// Returns a reader builder for specific `file`.

View File

@@ -35,6 +35,8 @@ pub fn new_file_handle(
),
level,
file_size: 0,
available_indexes: Default::default(),
index_file_size: 0,
},
file_purger,
)

View File

@@ -22,6 +22,7 @@ use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -39,7 +40,7 @@ use crate::read::{BoxedBatchReader, Source};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::sst::version::LevelMeta;
@@ -330,6 +331,11 @@ impl TwcsCompactionTask {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
});
Ok(file_meta_opt)
});

View File

@@ -553,5 +553,5 @@ async fn test_region_usage() {
assert_eq!(region_stat.sst_usage, 2742);
// region total usage
assert_eq!(region_stat.disk_usage(), 3748);
assert_eq!(region_stat.disk_usage(), 3791);
}

View File

@@ -321,6 +321,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to delete index file, file id: {}", file_id))]
DeleteIndex {
file_id: FileId,
#[snafu(source)]
error: object_store::Error,
location: Location,
},
#[snafu(display("Failed to flush region {}", region_id))]
FlushRegion {
region_id: RegionId,
@@ -596,7 +604,7 @@ impl ErrorExt for Error {
InvalidSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,
DeleteSst { .. } => StatusCode::StorageUnavailable,
DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable,
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,

View File

@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::{error, info};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
@@ -39,7 +40,7 @@ use crate::request::{
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file::{FileId, FileMeta, IndexType};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
@@ -339,6 +340,11 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
};
file_metas.push(file_meta);
}

View File

@@ -171,6 +171,8 @@ async fn checkpoint_with_different_compression_types() {
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
available_indexes: Default::default(),
index_file_size: 0,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],

View File

@@ -121,6 +121,9 @@ lazy_static! {
/// Counter of filtered rows during merge.
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
/// Counter of row groups read.
pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.
// Cache related metrics.

View File

@@ -14,8 +14,10 @@
//! Scans a region according to the scan request.
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
@@ -27,6 +29,8 @@ use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -194,6 +198,7 @@ impl ScanRegion {
total_ssts
);
let index_applier = self.build_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
@@ -207,6 +212,7 @@ impl ScanRegion {
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager)
.with_index_applier(index_applier)
.with_parallelism(self.parallelism);
Ok(seq_scan)
@@ -224,6 +230,20 @@ impl ScanRegion {
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
.build()
}
/// Use the latest schema to build the index applier.
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
SstIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
self.version.metadata.as_ref(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))
.ok()
.flatten()
.map(Arc::new)
}
}
/// Config for parallel scan.

View File

@@ -39,6 +39,7 @@ use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanParallism;
use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
/// Scans a region and returns rows in a sorted sequence.
///
@@ -62,6 +63,8 @@ pub struct SeqScan {
ignore_file_not_found: bool,
/// Parallelism to scan data.
parallelism: ScanParallism,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
}
impl SeqScan {
@@ -78,6 +81,7 @@ impl SeqScan {
cache_manager: None,
ignore_file_not_found: false,
parallelism: ScanParallism::default(),
index_applier: None,
}
}
@@ -130,6 +134,13 @@ impl SeqScan {
self
}
/// Sets index applier.
#[must_use]
pub(crate) fn with_index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_applier = index_applier;
self
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let start = Instant::now();

View File

@@ -651,8 +651,7 @@ impl OnFailure for FlushFinished {
// Clean flushed files.
for file in &self.file_metas {
self.file_purger.send_request(PurgeRequest {
region_id: file.region_id,
file_id: file.file_id,
file_meta: file.clone(),
});
}
}
@@ -707,14 +706,12 @@ impl OnFailure for CompactionFinished {
}));
}
for file in &self.compacted_files {
let file_id = file.file_id;
warn!(
"Cleaning region {} compaction output file: {}",
self.region_id, file_id
self.region_id, file.file_id
);
self.file_purger.send_request(PurgeRequest {
region_id: self.region_id,
file_id,
file_meta: file.clone(),
});
}
}

View File

@@ -21,6 +21,7 @@ use std::sync::Arc;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;
@@ -95,6 +96,23 @@ pub struct FileMeta {
pub level: Level,
/// Size of the file.
pub file_size: u64,
/// Available indexes of the file.
pub available_indexes: SmallVec<[IndexType; 4]>,
/// Size of the index file.
pub index_file_size: u64,
}
/// Type of index.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IndexType {
/// Inverted index.
InvertedIndex,
}
impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}
}
/// Handle to a SST file.
@@ -176,8 +194,7 @@ impl Drop for FileHandleInner {
fn drop(&mut self) {
if self.deleted.load(Ordering::Relaxed) {
self.file_purger.send_request(PurgeRequest {
region_id: self.meta.region_id,
file_id: self.meta.file_id,
file_meta: self.meta.clone(),
});
}
}
@@ -236,6 +253,8 @@ mod tests {
time_range: FileTimeRange::default(),
level,
file_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
index_file_size: 0,
}
}
@@ -250,7 +269,8 @@ mod tests {
#[test]
fn test_deserialize_from_string() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
\"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\"level\":0}";
\"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
\"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,

View File

@@ -16,20 +16,17 @@ use std::fmt;
use std::sync::Arc;
use common_telemetry::{error, info};
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileId;
use crate::sst::file::FileMeta;
/// Request to remove a file.
#[derive(Debug)]
pub struct PurgeRequest {
/// Region id of the file.
pub region_id: RegionId,
/// Id of the file.
pub file_id: FileId,
/// File meta.
pub file_meta: FileMeta,
}
/// A worker to delete files in background.
@@ -72,24 +69,22 @@ impl LocalFilePurger {
impl FilePurger for LocalFilePurger {
fn send_request(&self, request: PurgeRequest) {
let file_id = request.file_id;
let region_id = request.region_id;
let file_meta = request.file_meta;
let sst_layer = self.sst_layer.clone();
// Remove meta of the file from cache.
if let Some(cache) = &self.cache_manager {
cache.remove_parquet_meta_data(region_id, file_id);
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(file_id).await {
error!(e; "Failed to delete SST file, file: {}, region: {}",
file_id.as_parquet(), region_id);
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
"Successfully deleted SST file: {}, region: {}",
file_id.as_parquet(),
region_id
"Successfully deleted SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id
);
}
})) {
@@ -103,11 +98,12 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::ObjectStore;
use smallvec::SmallVec;
use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
use crate::sst::location;
#[tokio::test]
@@ -137,6 +133,8 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: Default::default(),
index_file_size: 0,
},
file_purger,
);
@@ -148,4 +146,52 @@ mod tests {
assert!(!object_store.is_exist(&path).await.unwrap());
}
#[tokio::test]
async fn test_file_purge_with_index() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = location::sst_file_path(sst_dir, sst_file_id);
object_store.write(&path, vec![0; 4096]).await.unwrap();
let index_path = location::index_file_path(sst_dir, sst_file_id);
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
{
let handle = FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: sst_file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
index_file_size: 4096,
},
file_purger,
);
// mark file as deleted and drop the handle, we expect the sst file and the index file are deleted.
handle.mark_deleted();
}
scheduler.stop(true).await.unwrap();
assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.is_exist(&index_path).await.unwrap());
}
}

View File

@@ -15,6 +15,7 @@
pub mod builder;
use std::collections::BTreeSet;
use std::sync::Arc;
use futures::{AsyncRead, AsyncSeek};
use index::inverted_index::format::reader::InvertedIndexBlobReader;
@@ -52,8 +53,12 @@ pub struct SstIndexApplier {
index_applier: Box<dyn IndexApplier>,
}
pub type SstIndexApplierRef = Arc<SstIndexApplier>;
impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
///
/// TODO(zhongzc): leverage `WriteCache`
pub fn new(
region_dir: String,
object_store: ObjectStore,

View File

@@ -66,6 +66,10 @@ pub struct SstInfo {
pub num_rows: usize,
/// File Meta Data
pub file_metadata: Option<Arc<ParquetMetaData>>,
/// Whether inverted index is available.
pub inverted_index_available: bool,
/// Index file size in bytes.
pub index_file_size: u64,
}
#[cfg(test)]

View File

@@ -26,6 +26,7 @@
//!
//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
use std::borrow::Borrow;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
@@ -261,7 +262,7 @@ impl ReadFormat {
/// Returns min values of specific column in row groups.
pub(crate) fn min_values(
&self,
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
@@ -281,7 +282,7 @@ impl ReadFormat {
/// Returns max values of specific column in row groups.
pub(crate) fn max_values(
&self,
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
@@ -301,7 +302,7 @@ impl ReadFormat {
/// Returns null counts of specific column in row groups.
pub(crate) fn null_counts(
&self,
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
@@ -345,7 +346,7 @@ impl ReadFormat {
/// Returns min/max values of specific tag.
fn tag_values(
&self,
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
@@ -363,7 +364,10 @@ impl ReadFormat {
let converter =
McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]);
let values = row_groups.iter().map(|meta| {
let stats = meta.column(self.primary_key_position()).statistics()?;
let stats = meta
.borrow()
.column(self.primary_key_position())
.statistics()?;
if !stats.has_min_max_set() {
return None;
}
@@ -400,7 +404,7 @@ impl ReadFormat {
/// Returns min/max values of specific non-tag columns.
fn column_values(
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
column_index: usize,
is_min: bool,
@@ -414,7 +418,7 @@ impl ReadFormat {
let scalar_values = row_groups
.iter()
.map(|meta| {
let stats = meta.column(column_index).statistics()?;
let stats = meta.borrow().column(column_index).statistics()?;
if !stats.has_min_max_set() {
return None;
}
@@ -463,11 +467,11 @@ impl ReadFormat {
/// Returns null counts of specific non-tag columns.
fn column_null_counts(
row_groups: &[RowGroupMetaData],
row_groups: &[impl Borrow<RowGroupMetaData>],
column_index: usize,
) -> Option<ArrayRef> {
let values = row_groups.iter().map(|meta| {
let col = meta.column(column_index);
let col = meta.borrow().column(column_index);
let stat = col.statistics()?;
Some(stat.null_count())
});

View File

@@ -14,12 +14,12 @@
//! Parquet reader.
use std::collections::{HashSet, VecDeque};
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
@@ -39,9 +39,10 @@ use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu,
Result,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -64,6 +65,8 @@ pub struct ParquetReaderBuilder {
projection: Option<Vec<ColumnId>>,
/// Manager that caches SST data.
cache_manager: Option<CacheManagerRef>,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
}
impl ParquetReaderBuilder {
@@ -81,6 +84,7 @@ impl ParquetReaderBuilder {
time_range: None,
projection: None,
cache_manager: None,
index_applier: None,
}
}
@@ -110,6 +114,13 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the index applier to the builder.
#[must_use]
pub fn index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_applier = index_applier;
self
}
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -129,35 +140,8 @@ impl ParquetReaderBuilder {
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?;
// Computes column ids to read.
let column_ids: HashSet<_> = self
.projection
.as_ref()
.map(|p| p.iter().cloned().collect())
.unwrap_or_else(|| {
region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect()
});
let read_format = ReadFormat::new(Arc::new(region_meta));
// Prunes row groups by metadata.
let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate {
let stats =
RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids);
predicate
.prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect()
} else {
(0..parquet_meta.num_row_groups()).collect()
};
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let projection_mask = if let Some(column_ids) = self.projection.as_ref() {
@@ -174,6 +158,13 @@ impl ParquetReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;
let mut metrics = Metrics::default();
// Computes row groups to read.
let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
.await;
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
@@ -185,7 +176,6 @@ impl ParquetReaderBuilder {
};
let metrics = Metrics {
read_row_groups: row_groups.len(),
build_cost: start.elapsed(),
..Default::default()
};
@@ -256,13 +246,76 @@ impl ParquetReaderBuilder {
Ok(metadata)
}
/// Computes row groups to read.
async fn row_groups_to_read(
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
) -> BTreeSet<usize> {
let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect();
metrics.num_row_groups_unfiltered += row_group_ids.len();
// Applies index to prune row groups.
//
// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
// as an escape route in case of index issues, and it can be used to test
// the correctness of the index.
if let Some(index_applier) = &self.index_applier {
if self.file_handle.meta().inverted_index_available() {
match index_applier.apply(self.file_handle.file_id()).await {
Ok(row_groups) => row_group_ids = row_groups,
Err(err) => warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()),
}
}
}
metrics.num_row_groups_inverted_index_selected += row_group_ids.len();
if row_group_ids.is_empty() {
return row_group_ids;
}
// Prunes row groups by min-max index.
if let Some(predicate) = &self.predicate {
let region_meta = read_format.metadata();
let column_ids = match &self.projection {
Some(ids) => ids.iter().cloned().collect(),
None => region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect(),
};
let row_groups = row_group_ids
.iter()
.map(|id| parquet_meta.row_group(*id))
.collect::<Vec<_>>();
let stats = RowGroupPruningStats::new(&row_groups, read_format, column_ids);
let mut mask = predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.into_iter();
row_group_ids.retain(|_| mask.next().unwrap_or(false));
};
metrics.num_row_groups_min_max_selected += row_group_ids.len();
row_group_ids
}
}
/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
/// Number of row groups to read.
read_row_groups: usize,
/// Number of unfiltered row groups.
num_row_groups_unfiltered: usize,
/// Number of row groups to read after filtering by inverted index.
num_row_groups_inverted_index_selected: usize,
/// Number of row groups to read after filtering by min-max index.
num_row_groups_min_max_selected: usize,
/// Duration to build the parquet reader.
build_cost: Duration,
/// Duration to scan the reader.
@@ -337,7 +390,7 @@ impl RowGroupReaderBuilder {
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Indices of row groups to read.
row_groups: VecDeque<usize>,
row_groups: BTreeSet<usize>,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
@@ -390,8 +443,8 @@ impl Drop for ParquetReader {
self.reader_builder.file_handle.region_id(),
self.reader_builder.file_handle.file_id(),
self.reader_builder.file_handle.time_range(),
self.metrics.read_row_groups,
self.reader_builder.parquet_meta.num_row_groups(),
self.metrics.num_row_groups_min_max_selected,
self.metrics.num_row_groups_unfiltered,
self.metrics
);
@@ -405,6 +458,15 @@ impl Drop for ParquetReader {
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["unfiltered"])
.inc_by(self.metrics.num_row_groups_unfiltered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_selected"])
.inc_by(self.metrics.num_row_groups_inverted_index_selected as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["min_max_index_selected"])
.inc_by(self.metrics.num_row_groups_min_max_selected as u64);
}
}
@@ -432,7 +494,7 @@ impl ParquetReader {
}
// No more items in current row group, reads next row group.
while let Some(row_group_idx) = self.row_groups.pop_front() {
while let Some(row_group_idx) = self.row_groups.pop_first() {
let mut row_group_reader = self.reader_builder.build(row_group_idx).await?;
let Some(record_batch) =
row_group_reader

View File

@@ -14,6 +14,7 @@
//! Statistics of parquet SSTs.
use std::borrow::Borrow;
use std::collections::HashSet;
use datafusion::physical_optimizer::pruning::PruningStatistics;
@@ -25,9 +26,9 @@ use store_api::storage::ColumnId;
use crate::sst::parquet::format::ReadFormat;
/// Statistics for pruning row groups.
pub(crate) struct RowGroupPruningStats<'a> {
pub(crate) struct RowGroupPruningStats<'a, T> {
/// Metadata of SST row groups.
row_groups: &'a [RowGroupMetaData],
row_groups: &'a [T],
/// Helper to read the SST.
read_format: &'a ReadFormat,
/// Projected column ids to read.
@@ -37,10 +38,10 @@ pub(crate) struct RowGroupPruningStats<'a> {
column_ids: HashSet<ColumnId>,
}
impl<'a> RowGroupPruningStats<'a> {
impl<'a, T> RowGroupPruningStats<'a, T> {
/// Creates a new statistics to prune specific `row_groups`.
pub(crate) fn new(
row_groups: &'a [RowGroupMetaData],
row_groups: &'a [T],
read_format: &'a ReadFormat,
column_ids: HashSet<ColumnId>,
) -> Self {
@@ -61,7 +62,7 @@ impl<'a> RowGroupPruningStats<'a> {
}
}
impl<'a> PruningStatistics for RowGroupPruningStats<'a> {
impl<'a, T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'a, T> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
self.read_format.min_values(self.row_groups, column_id)

View File

@@ -124,6 +124,8 @@ impl ParquetWriter {
file_size,
num_rows: stats.num_rows,
file_metadata: Some(Arc::new(parquet_metadata)),
inverted_index_available: false,
index_file_size: 0,
}))
}

View File

@@ -92,7 +92,10 @@ impl SstVersion {
level_meta
.files
.values()
.map(|file_handle| file_handle.meta().file_size)
.map(|file_handle| {
let meta = file_handle.meta();
meta.file_size + meta.index_file_size
})
.sum::<u64>()
})
.sum()

View File

@@ -106,6 +106,8 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
),
level: 0,
file_size: 0,
available_indexes: Default::default(),
index_file_size: 0,
},
file_purger,
)

View File

@@ -96,6 +96,8 @@ impl VersionControlBuilder {
),
level: 0,
file_size: 0, // We don't care file size.
available_indexes: Default::default(),
index_file_size: 0,
},
);
self
@@ -136,6 +138,8 @@ pub(crate) fn apply_edit(
),
level: 0,
file_size: 0, // We don't care file size.
available_indexes: Default::default(),
index_file_size: 0,
}
})
.collect();