feat: get row group time range from cached metadata (#4869)

* feat: get part range min-max from cache for unordered scan

* feat: seq scan push row groups if num_row_groups > 0

* test: test split

* feat: update comment

* test: fix split test

* refactor: rename get meta data method
This commit is contained in:
Yingwen
2024-11-01 14:35:03 +08:00
committed by GitHub
parent 758ad0a8c5
commit 39ab1a6415
5 changed files with 336 additions and 22 deletions

View File

@@ -80,18 +80,15 @@ impl CacheManager {
CacheManagerBuilder::default()
}
/// Gets cached [ParquetMetaData].
/// Gets cached [ParquetMetaData] from in-memory cache first.
/// If not found, tries to get it from write cache and fill the in-memory cache.
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
let metadata = self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
});
let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id);
if metadata.is_some() {
return metadata;
}
@@ -110,6 +107,20 @@ impl CacheManager {
None
}
/// Gets cached [ParquetMetaData] from in-memory cache.
/// This method does not perform I/O.
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(
&self,

View File

@@ -18,15 +18,17 @@ use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use crate::cache::CacheManager;
use crate::memtable::MemtableRef;
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
/// Index to access a row group.
#[derive(Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
/// Index to the memtable/file.
pub(crate) index: usize,
@@ -38,6 +40,7 @@ pub(crate) struct RowGroupIndex {
/// Meta data of a partition range.
/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
#[derive(Debug, PartialEq)]
pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
@@ -84,7 +87,12 @@ impl RangeMeta {
pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
Self::push_unordered_file_ranges(input.memtables.len(), &input.files, &mut ranges);
Self::push_unordered_file_ranges(
input.memtables.len(),
&input.files,
input.cache_manager.as_deref(),
&mut ranges,
);
ranges
}
@@ -164,12 +172,36 @@ impl RangeMeta {
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
cache: Option<&CacheManager>,
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// Get parquet meta from the cache.
let parquet_meta = cache.and_then(|c| {
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
});
if let Some(parquet_meta) = parquet_meta {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
let time_range = parquet_row_group_time_range(
file.meta_ref(),
&parquet_meta,
row_group_index as usize,
);
let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
ranges.push(RangeMeta {
time_range: time_range.unwrap_or_else(|| file.time_range()),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
}],
num_rows: num_rows as usize,
});
}
} else if file.meta_ref().num_row_groups > 0 {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
@@ -217,7 +249,6 @@ impl RangeMeta {
}
}
// TODO(yingwen): Support multiple row groups in a range so we can split them later.
fn push_seq_file_ranges(
num_memtables: usize,
files: &[FileHandle],
@@ -226,15 +257,31 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
if file.meta_ref().num_row_groups > 0 {
// All row groups share the same time range.
let row_group_indices = (0..file.meta_ref().num_row_groups)
.map(|row_group_index| RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
})
.collect();
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices,
num_rows: file.meta_ref().num_rows as usize,
});
} else {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
}
@@ -366,4 +413,212 @@ mod tests {
&[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
);
}
#[test]
fn test_merge_range() {
let mut left = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
num_rows: 5,
};
let right = RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
indices: smallvec![2],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 2
}
],
num_rows: 4,
};
left.merge(right);
assert_eq!(
left,
RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
},
RowGroupIndex {
index: 2,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 2
},
],
num_rows: 9,
}
);
}
#[test]
fn test_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
num_rows: 5,
};
assert!(range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);
assert_eq!(
output,
&[
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 2
}],
num_rows: 2,
}
]
);
}
#[test]
fn test_not_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 1
}
],
num_rows: 5,
};
assert!(!range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);
assert_eq!(1, output.len());
}
#[test]
fn test_maybe_split_ranges() {
let ranges = vec![
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 0
},
RowGroupIndex {
index: 1,
row_group_index: 1
}
],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
},
RowGroupIndex {
index: 3,
row_group_index: 0
}
],
num_rows: 5,
},
];
let output = maybe_split_ranges_for_seq_scan(ranges);
assert_eq!(
output,
vec![
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 0
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
}],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
},
RowGroupIndex {
index: 3,
row_group_index: 0
}
],
num_rows: 5,
},
]
)
}
}

View File

@@ -111,7 +111,8 @@ pub struct FileMeta {
pub region_id: RegionId,
/// Compared to normal file names, FileId ignore the extension
pub file_id: FileId,
/// Timestamp range of file.
/// Timestamp range of file. The timestamps have the same time unit as the
/// data in the SST.
pub time_range: FileTimeRange,
/// SST level of the file.
pub level: Level,

View File

@@ -62,7 +62,8 @@ impl Default for WriteOptions {
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// Time range of the SST.
/// Time range of the SST. The timestamps have the same time unit as the
/// data in the SST.
pub time_range: FileTimeRange,
/// File size in bytes.
pub file_size: u64,

View File

@@ -31,13 +31,14 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use api::v1::SemanticType;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
@@ -48,6 +49,7 @@ use crate::error::{
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::to_sst_arrow_schema;
/// Arrow array type for the primary key dictionary.
@@ -558,6 +560,50 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
Arc::new(DictionaryArray::new(keys, values))
}
/// Gets the min/max time index of the row group from the parquet meta.
/// It assumes the parquet is created by the mito engine.
pub(crate) fn parquet_row_group_time_range(
file_meta: &FileMeta,
parquet_meta: &ParquetMetaData,
row_group_idx: usize,
) -> Option<FileTimeRange> {
let row_group_meta = parquet_meta.row_group(row_group_idx);
let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
assert!(
num_columns >= FIXED_POS_COLUMN_NUM,
"file only has {} columns",
num_columns
);
let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
let stats = row_group_meta.column(time_index_pos).statistics()?;
if stats.has_min_max_set() {
// The physical type for the timestamp should be i64.
let (min, max) = match stats {
Statistics::Int64(value_stats) => (*value_stats.min(), *value_stats.max()),
Statistics::Int32(_)
| Statistics::Boolean(_)
| Statistics::Int96(_)
| Statistics::Float(_)
| Statistics::Double(_)
| Statistics::ByteArray(_)
| Statistics::FixedLenByteArray(_) => return None,
};
debug_assert!(
min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value()
);
debug_assert!(
max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value()
);
let unit = file_meta.time_range.0.unit();
Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
} else {
None
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;