perf(mito2): speed up parquet scan via meta caches

This commit is contained in:
Ruihang Xia
2026-02-11 15:24:52 +08:00
parent bcfbd01582
commit c40735e704
7 changed files with 239 additions and 45 deletions

View File

@@ -116,6 +116,8 @@ pub enum PredicateKey {
Bloom(BloomFilterKey),
/// Inverted index predicate.
Inverted(InvertedIndexKey),
/// Min-max pruning predicate.
MinMax(MinMaxKey),
}
impl PredicateKey {
@@ -134,12 +136,18 @@ impl PredicateKey {
Self::Inverted(InvertedIndexKey::new(predicates))
}
/// Creates a new min-max pruning key.
pub fn new_minmax(exprs: Arc<Vec<String>>) -> Self {
Self::MinMax(MinMaxKey::new(exprs))
}
/// Returns the memory usage of this key.
pub fn mem_usage(&self) -> usize {
match self {
Self::Fulltext(key) => key.mem_usage,
Self::Bloom(key) => key.mem_usage,
Self::Inverted(key) => key.mem_usage,
Self::MinMax(key) => key.mem_usage,
}
}
}
@@ -239,6 +247,20 @@ impl InvertedIndexKey {
}
}
/// Key for min-max pruning.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct MinMaxKey {
exprs: Arc<Vec<String>>,
mem_usage: usize,
}
impl MinMaxKey {
pub fn new(exprs: Arc<Vec<String>>) -> Self {
let mem_usage = exprs.iter().map(|s| s.len()).sum::<usize>();
Self { exprs, mem_usage }
}
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {

View File

@@ -321,10 +321,10 @@ impl MitoConfig {
);
// Use 2x of global write buffer size as global write buffer reject size.
let global_write_buffer_reject_size = global_write_buffer_size * 2;
// shouldn't be greater than 128MB in default mode.
// shouldn't be greater than 256MB in default mode.
let sst_meta_cache_size = cmp::min(
sys_memory / SST_META_CACHE_SIZE_FACTOR,
ReadableSize::mb(128),
ReadableSize::mb(256),
);
// shouldn't be greater than 512MB in default mode.
let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));

View File

@@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::compute;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
@@ -47,7 +47,7 @@ pub(crate) struct ConvertBatchStream {
projection_mapper: Arc<ProjectionMapper>,
cache_strategy: CacheStrategy,
partition_metrics: PartitionMetrics,
buffer: Vec<DfRecordBatch>,
pending: VecDeque<RecordBatch>,
}
impl ConvertBatchStream {
@@ -62,7 +62,7 @@ impl ConvertBatchStream {
projection_mapper,
cache_strategy,
partition_metrics,
buffer: Vec::new(),
pending: VecDeque::new(),
}
}
@@ -79,40 +79,36 @@ impl ConvertBatchStream {
}
}
ScanBatch::Series(series) => {
self.buffer.clear();
debug_assert!(
self.pending.is_empty(),
"ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
);
match series {
SeriesBatch::PrimaryKey(primary_key_batch) => {
self.buffer.reserve(primary_key_batch.batches.len());
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
for batch in primary_key_batch.batches {
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
self.pending
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
}
}
SeriesBatch::Flat(flat_batch) => {
self.buffer.reserve(flat_batch.batches.len());
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
for batch in flat_batch.batches {
let record_batch = mapper.convert(&batch)?;
self.buffer.push(record_batch.into_df_record_batch());
self.pending.push_back(mapper.convert(&batch)?);
}
}
}
let output_schema = self.projection_mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
Ok(RecordBatch::from_df_record_batch(
output_schema,
record_batch,
))
Ok(self
.pending
.pop_front()
.unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.
@@ -128,6 +124,10 @@ impl Stream for ConvertBatchStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.pending.pop_front() {
return Poll::Ready(Some(Ok(batch)));
}
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
let Some(batch) = batch else {
return Poll::Ready(None);

View File

@@ -69,6 +69,8 @@ use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::reader::MetadataCacheMetrics;
use crate::time_provider::TimeProviderRef;
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
@@ -585,6 +587,7 @@ impl RegionOpener {
let region = Arc::new(region);
maybe_load_cache(&region, config, &self.cache_manager);
maybe_preload_parquet_meta_cache(&region, config, &self.cache_manager);
Ok(Some(region))
}
@@ -943,6 +946,89 @@ fn maybe_load_cache(
write_cache.load_region_cache(task);
}
/// Preloads Parquet metadata into the in-memory SST meta cache on region open.
///
/// This improves the latency of the first query after server start by avoiding large Parquet
/// metadata reads on demand.
fn maybe_preload_parquet_meta_cache(
region: &MitoRegionRef,
config: &MitoConfig,
cache_manager: &Option<CacheManagerRef>,
) {
let Some(cache_manager) = cache_manager else {
return;
};
// Skip if SST meta cache is disabled.
if config.sst_meta_cache_size.as_bytes() == 0 {
return;
}
let region = region.clone();
let cache_manager = cache_manager.clone();
tokio::spawn(async move {
let region_id = region.region_id;
let table_dir = region.access_layer.table_dir();
let path_type = region.access_layer.path_type();
let object_store = region.access_layer.object_store();
// Collect SST files. Do not hold the version longer than needed.
let mut files = Vec::new();
{
let version = region.version_control.current().version;
for level in version.ssts.levels() {
for file_handle in level.files.values() {
files.push(file_handle.clone());
}
}
}
// Load older files first so the most recent files remain hot in the LRU cache.
files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1));
let mut loaded = 0usize;
for file_handle in files {
let file_id = file_handle.file_id();
if cache_manager
.get_parquet_meta_data_from_mem_cache(file_id)
.is_some()
{
continue;
}
let file_size = file_handle.meta_ref().file_size;
if file_size == 0 {
continue;
}
let file_path = file_handle.file_path(table_dir, path_type);
let mut cache_metrics = MetadataCacheMetrics::default();
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
match loader.load(&mut cache_metrics).await {
Ok(metadata) => {
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata));
loaded += 1;
}
Err(err) => {
// Preloading is best-effort. Failure shouldn't affect region open.
warn!(
err; "Failed to preload parquet metadata, region: {}, file: {}",
region_id, file_path
);
}
}
}
if loaded > 0 {
info!(
"Preloaded parquet metadata for region {}, loaded_files: {}",
region_id, loaded
);
}
});
}
fn can_load_cache(state: RegionRoleState) -> bool {
match state {
RegionRoleState::Leader(RegionLeaderState::Writable)

View File

@@ -39,9 +39,9 @@ pub mod writer;
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 4096;
/// Default row group size for parquet files.
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
/// Parquet write options.
#[derive(Debug, Clone)]

View File

@@ -679,15 +679,14 @@ impl ParquetReaderBuilder {
metrics.rg_total += num_row_groups;
metrics.rows_total += num_rows as usize;
let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
// Compute skip_fields once for all pruning operations
let skip_fields = self.compute_skip_fields(parquet_meta);
self.prune_row_groups_by_minmax(
let mut output = self.row_groups_by_minmax(
read_format,
parquet_meta,
&mut output,
row_group_size,
num_rows as usize,
metrics,
skip_fields,
);
@@ -1136,20 +1135,43 @@ impl ParquetReaderBuilder {
}
}
/// Prunes row groups by min-max index.
fn prune_row_groups_by_minmax(
/// Computes row groups selection after min-max pruning.
fn row_groups_by_minmax(
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut RowGroupSelection,
row_group_size: usize,
total_row_count: usize,
metrics: &mut ReaderFilterMetrics,
skip_fields: bool,
) -> bool {
) -> RowGroupSelection {
let Some(predicate) = &self.predicate else {
return false;
return RowGroupSelection::new(row_group_size, total_row_count);
};
let row_groups_before = output.row_group_count();
let file_id = self.file_handle.file_id().file_id();
let cached_minmax_key = if predicate.dyn_filters().is_empty() {
// Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
// building row-group pruning stats for identical predicates across queries.
let mut exprs = predicate
.exprs()
.iter()
.map(|expr| format!("{expr:?}"))
.collect::<Vec<_>>();
exprs.sort();
Some(PredicateKey::new_minmax(Arc::new(exprs)))
} else {
None
};
if let Some(index_result_cache) = self.cache_strategy.index_result_cache()
&& let Some(predicate_key) = cached_minmax_key.as_ref()
&& let Some(result) = index_result_cache.get(predicate_key, file_id)
{
let num_row_groups = parquet_meta.num_row_groups();
metrics.rg_minmax_filtered += num_row_groups.saturating_sub(result.row_group_count());
return (*result).clone();
}
let region_meta = read_format.metadata();
let row_groups = parquet_meta.row_groups();
@@ -1168,20 +1190,26 @@ impl ParquetReaderBuilder {
// Here we use the schema of the SST to build the physical expression. If the column
// in the SST doesn't have the same column id as the column in the expected metadata,
// we will get a None statistics for that column.
predicate
.prune_with_stats(&stats, prune_schema)
.iter()
.zip(0..parquet_meta.num_row_groups())
.for_each(|(mask, row_group)| {
if !*mask {
output.remove_row_group(row_group);
}
});
let mask = predicate.prune_with_stats(&stats, prune_schema);
let output = RowGroupSelection::from_full_row_group_ids(
mask.iter()
.enumerate()
.filter_map(|(row_group, keep)| keep.then_some(row_group)),
row_group_size,
total_row_count,
);
let row_groups_after = output.row_group_count();
metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
metrics.rg_minmax_filtered += parquet_meta
.num_row_groups()
.saturating_sub(output.row_group_count());
true
if let Some(index_result_cache) = self.cache_strategy.index_result_cache()
&& let Some(predicate_key) = cached_minmax_key
{
index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
}
output
}
fn apply_index_result_and_update_cache(

View File

@@ -77,6 +77,64 @@ impl RowGroupSelection {
}
}
/// Creates a new `RowGroupSelection` that selects all rows in the specified row groups.
///
/// This is useful for fast construction after coarse pruning (e.g. min-max pruning),
/// avoiding building and then removing a full selection of all row groups.
pub fn from_full_row_group_ids<I>(
row_group_ids: I,
row_group_size: usize,
total_row_count: usize,
) -> Self
where
I: IntoIterator<Item = usize>,
{
if row_group_size == 0 || total_row_count == 0 {
return Self::default();
}
let row_group_count = total_row_count.div_ceil(row_group_size);
if row_group_count == 0 {
return Self::default();
}
let last_row_group_size = total_row_count - (row_group_count - 1) * row_group_size;
let mut selection_in_rg = BTreeMap::new();
let mut row_count = 0usize;
let mut selector_len = 0usize;
for rg_id in row_group_ids {
if rg_id >= row_group_count {
continue;
}
let rg_row_count = if rg_id == row_group_count - 1 {
last_row_group_size
} else {
row_group_size
};
let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]);
selection_in_rg.insert(
rg_id,
RowSelectionWithCount {
selection,
row_count: rg_row_count,
selector_len: 1,
},
);
row_count += rg_row_count;
selector_len += 1;
}
Self {
selection_in_rg,
row_count,
selector_len,
}
}
/// Returns the row selection for a given row group.
///
/// `None` indicates not selected.