mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
feat(mito2): add PK-range-aware TWCS overlap handling (#7954)
* feat(mito2): extract and cache primary key range for SST files Extracts primary key ranges from SST files during flush and compaction, and caches them in FileHandle for future use (e.g., overlapping checks). Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/compaction-overlapping-check: - **Enhance Primary Key Range Logic**: Updated the `primary_key_ranges_overlap` function in `run.rs` to use `chunk()` for comparing byte ranges, improving accuracy in overlap detection. - **Refactor Run Assignment Logic**: Simplified the logic for assigning items to runs in `run.rs` by removing redundant match statements and using `is_empty()` and `iter().any()` for cleaner checks. - **Add Test for Transitivity Break**: Introduced a new test `test_find_sorted_runs_handles_2d_transitivity_break` in `run.rs` to ensure correct handling of transitivity breaks in sorted runs. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/compaction-overlapping-check: - **Remove PK-disjoint detection logic**: Simplified the compaction logic in `twcs.rs` by removing the `has_time_overlapping_pairs` function and related logic for PK-disjoint detection. This includes the removal of the `append_mode_force_compact` condition and associated tests. - **Update compaction trigger settings**: Modified `append_mode_test.rs` to set `compaction.twcs.trigger_file_num` to "2" and adjusted the expected number of files in the scanner assertion from 1 to 2. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: rebase main Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/compaction-overlapping-check: ### Remove Unused Import in `compactor.rs` - Removed the unused import `compact_request` from `compactor.rs` to clean up the codebase. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: tighten `mito2` file lifecycle handling Refine compaction, flush, and SST/version bookkeeping across `src/mito2/src/compaction/*`, `src/mito2/src/flush.rs`, `src/mito2/src/region/*`, `src/mito2/src/sst/*`, and related tests/utilities. * fix: reuse primary key range merge in twcs compaction Centralize primary key range merging so can call the shared helper from . Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor(mito2): simplify `FileHandle` initialization and internalize primary key range extraction - Updated `FileHandle::new` to automatically compute the primary key range directly from `FileMeta`. - Restricted `FileHandle::new_with_primary_key_range` to be test-only by adding the `#[cfg(test)]` attribute. - Simplified `SstVersion::add_files` by adopting the updated `FileHandle::new` instead of manually providing the primary key range. Modified files: - `src/mito2/src/sst/file.rs` - `src/mito2/src/sst/version.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/compaction-overlapping-check: ### Improve File Management and Documentation - **`twcs.rs`**: Added a comment to clarify the merging of small files when there are no overlapping files. - **`version.rs` (in `region` and `sst`)**: Enhanced documentation for `add_files` method, explaining its functionality and panic conditions. Simplified the file handle creation logic in `sst/version.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/compaction-overlapping-check: ### Enhance Primary Key Range Handling in `opener.rs` - Updated logic in `opener.rs` to set the primary key range for file handles when it is not already defined. This change ensures that the primary key range is extracted and set using `extract_primary_key_range` when necessary. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * docs: add docs for the necessity of checking pk and timesmaps while find overlapping files. * chore: address review comments Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7989,6 +7989,7 @@ dependencies = [
|
||||
"async-channel 1.9.0",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"base64 0.22.1",
|
||||
"bytemuck",
|
||||
"bytes",
|
||||
"chrono",
|
||||
|
||||
@@ -190,6 +190,7 @@ impl ObjbenchCommand {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ async-channel = "1.9"
|
||||
common-stat.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
base64.workspace = true
|
||||
bytemuck.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -55,6 +55,7 @@ use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::sst::location::region_dir_from_table_dir;
|
||||
use crate::sst::parquet::WriteOptions;
|
||||
use crate::sst::parquet::metadata::extract_primary_key_range;
|
||||
use crate::sst::version::{SstVersion, SstVersionRef};
|
||||
|
||||
/// Region version for compaction that does not hold memtables.
|
||||
@@ -405,22 +406,35 @@ impl SstMerger for DefaultSstMerger {
|
||||
|
||||
let output_files = sst_infos
|
||||
.into_iter()
|
||||
.map(|sst_info| FileMeta {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
time_range: sst_info.time_range,
|
||||
level: output.output_level,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_version: 0,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: max_sequence,
|
||||
partition_expr: partition_expr.clone(),
|
||||
num_series: sst_info.num_series,
|
||||
.map(|sst_info| {
|
||||
let pk_range = sst_info
|
||||
.file_metadata
|
||||
.as_ref()
|
||||
.and_then(|meta| extract_primary_key_range(meta, ®ion_metadata));
|
||||
let (primary_key_min, primary_key_max) = match pk_range {
|
||||
Some((min, max)) => (Some(min), Some(max)),
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
FileMeta {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
time_range: sst_info.time_range,
|
||||
level: output.output_level,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
index_version: 0,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
sequence: max_sequence,
|
||||
partition_expr: partition_expr.clone(),
|
||||
num_series: sst_info.num_series,
|
||||
primary_key_min,
|
||||
primary_key_max,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! This file contains code to find sorted runs in a set if ranged items and
|
||||
//! along with the best way to merge these items to satisfy the desired run count.
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use common_base::BitVec;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_time::Timestamp;
|
||||
@@ -32,15 +33,37 @@ pub trait Ranged {
|
||||
/// Returns the inclusive range of item.
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType);
|
||||
|
||||
fn overlap<T>(&self, other: &T) -> bool
|
||||
where
|
||||
T: Ranged<BoundType = Self::BoundType>,
|
||||
{
|
||||
fn overlap(&self, other: &Self) -> bool {
|
||||
let (lhs_start, lhs_end) = self.range();
|
||||
let (rhs_start, rhs_end) = other.range();
|
||||
|
||||
lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
|
||||
}
|
||||
|
||||
/// Like `overlap`, but treats touching boundaries as overlapping (inclusive).
|
||||
/// Used by `find_overlapping_items` where shared boundaries count as overlap.
|
||||
fn overlap_inclusive(&self, other: &Self) -> bool {
|
||||
let (lhs_start, lhs_end) = self.range();
|
||||
let (rhs_start, rhs_end) = other.range();
|
||||
|
||||
lhs_start.max(rhs_start) <= lhs_end.min(rhs_end)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool {
|
||||
lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk())
|
||||
}
|
||||
|
||||
pub(crate) fn merge_primary_key_ranges(
|
||||
lhs: Option<(Bytes, Bytes)>,
|
||||
rhs: Option<(Bytes, Bytes)>,
|
||||
) -> Option<(Bytes, Bytes)> {
|
||||
match (lhs, rhs) {
|
||||
(Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => {
|
||||
Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max)))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_overlapping_items<T: Item + Clone>(
|
||||
@@ -85,15 +108,15 @@ pub fn find_overlapping_items<T: Item + Clone>(
|
||||
// Check for overlaps with remaining right elements
|
||||
let mut j = r_idx;
|
||||
while j < r.items.len() {
|
||||
let (rhs_start, rhs_end) = r.items[j].range();
|
||||
let (rhs_start, _rhs_end) = r.items[j].range();
|
||||
|
||||
// If right element starts after left element ends, no more overlaps possible
|
||||
if rhs_start > lhs_end {
|
||||
break;
|
||||
}
|
||||
|
||||
// We have an overlap
|
||||
if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) {
|
||||
// We have an overlap (inclusive: touching boundaries count)
|
||||
if lhs.overlap_inclusive(&r.items[j]) {
|
||||
if !selected[lhs_idx] {
|
||||
result.push(lhs.clone());
|
||||
selected.set(lhs_idx, true);
|
||||
@@ -134,6 +157,7 @@ pub struct FileGroup {
|
||||
num_rows: usize,
|
||||
min_timestamp: Timestamp,
|
||||
max_timestamp: Timestamp,
|
||||
primary_key_range: Option<(Bytes, Bytes)>,
|
||||
}
|
||||
|
||||
impl FileGroup {
|
||||
@@ -141,12 +165,14 @@ impl FileGroup {
|
||||
let size = file.size() as usize;
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
let num_rows = file.num_rows();
|
||||
let primary_key_range = file.primary_key_range();
|
||||
Self {
|
||||
files: smallvec![file],
|
||||
size,
|
||||
num_rows,
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
primary_key_range,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,6 +186,8 @@ impl FileGroup {
|
||||
let (min_timestamp, max_timestamp) = file.time_range();
|
||||
self.min_timestamp = self.min_timestamp.min(min_timestamp);
|
||||
self.max_timestamp = self.max_timestamp.max(max_timestamp);
|
||||
self.primary_key_range =
|
||||
merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
|
||||
self.files.push(file);
|
||||
}
|
||||
|
||||
@@ -187,6 +215,32 @@ impl Ranged for FileGroup {
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
(self.min_timestamp, self.max_timestamp)
|
||||
}
|
||||
|
||||
fn overlap(&self, other: &Self) -> bool {
|
||||
let (lhs_start, lhs_end) = self.range();
|
||||
let (rhs_start, rhs_end) = other.range();
|
||||
if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) {
|
||||
return false;
|
||||
}
|
||||
|
||||
match (&self.primary_key_range, &other.primary_key_range) {
|
||||
(Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn overlap_inclusive(&self, other: &Self) -> bool {
|
||||
let (lhs_start, lhs_end) = self.range();
|
||||
let (rhs_start, rhs_end) = other.range();
|
||||
if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) {
|
||||
return false;
|
||||
}
|
||||
|
||||
match (&self.primary_key_range, &other.primary_key_range) {
|
||||
(Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for FileGroup {
|
||||
@@ -281,21 +335,20 @@ where
|
||||
// item is already assigned.
|
||||
continue;
|
||||
}
|
||||
match current_run.items.last() {
|
||||
None => {
|
||||
// current run is empty, just add current_item
|
||||
if current_run.items.is_empty() {
|
||||
// current run is empty, just add current_item
|
||||
selected.set(true);
|
||||
current_run.push_item(item.clone());
|
||||
} else {
|
||||
// the current item does not overlap with any item in current run,
|
||||
// then it belongs to current run. Because now we introduced primary
|
||||
// key range, we cannot simply use timestamps to check overlapping.
|
||||
let overlaps_any = current_run.items.iter().any(|i| i.overlap(item));
|
||||
if !overlaps_any {
|
||||
// does not overlap, push to current run
|
||||
selected.set(true);
|
||||
current_run.push_item(item.clone());
|
||||
}
|
||||
Some(last) => {
|
||||
// the current item does not overlap with the last item in current run,
|
||||
// then it belongs to current run.
|
||||
if !last.overlap(item) {
|
||||
// does not overlap, push to current run
|
||||
selected.set(true);
|
||||
current_run.push_item(item.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// finished an iteration, we've found a new run.
|
||||
@@ -422,7 +475,11 @@ pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use bytes::Bytes;
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
struct MockFile {
|
||||
@@ -467,6 +524,10 @@ mod tests {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
|
||||
Some((Bytes::from_static(min), Bytes::from_static(max)))
|
||||
}
|
||||
|
||||
fn check_sorted_runs(
|
||||
ranges: &[(i64, i64)],
|
||||
expected_runs: &[Vec<(i64, i64)>],
|
||||
@@ -726,6 +787,157 @@ mod tests {
|
||||
assert_eq!(result.len(), 4); // Should find both overlaps
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_group_overlap_time_overlap_pk_disjoint() {
|
||||
let lhs =
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
100,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
));
|
||||
let rhs =
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
150,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
));
|
||||
|
||||
assert!(!lhs.overlap(&rhs));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() {
|
||||
let mut files = vec![
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
100,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
)),
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
150,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
)),
|
||||
];
|
||||
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
|
||||
assert_eq!(1, runs.len());
|
||||
assert_eq!(2, runs[0].items().len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_sorted_runs_handles_2d_transitivity_break() {
|
||||
let mut files = vec![
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
100,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
)),
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
150,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
)),
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
150,
|
||||
0,
|
||||
3,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
)),
|
||||
];
|
||||
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
|
||||
assert_eq!(2, runs.len());
|
||||
assert_eq!(2, runs[0].items().len());
|
||||
assert_eq!(1, runs[1].items().len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_overlapping_items_skips_pk_disjoint_pairs() {
|
||||
let mut left = SortedRun::from(vec![FileGroup::new_with_file(
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
100,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
),
|
||||
)]);
|
||||
let mut right = SortedRun::from(vec![FileGroup::new_with_file(
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
150,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
),
|
||||
)]);
|
||||
let mut result = Vec::new();
|
||||
|
||||
find_overlapping_items(&mut left, &mut right, &mut result);
|
||||
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() {
|
||||
let lhs =
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
100,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
));
|
||||
let rhs =
|
||||
FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
100,
|
||||
150,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
));
|
||||
|
||||
assert!(!lhs.overlap(&rhs));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_seq_files() {
|
||||
// Test empty input
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common_time::Timestamp;
|
||||
use store_api::storage::FileId;
|
||||
|
||||
@@ -84,7 +85,46 @@ pub fn new_file_handle_with_size_and_sequence(
|
||||
num_series: 0,
|
||||
sequence: NonZeroU64::new(sequence),
|
||||
partition_expr: None,
|
||||
..Default::default()
|
||||
},
|
||||
file_purger,
|
||||
)
|
||||
}
|
||||
|
||||
/// Test util to create file handles with custom size and primary-key range.
|
||||
pub fn new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
file_id: FileId,
|
||||
start_ts_millis: i64,
|
||||
end_ts_millis: i64,
|
||||
level: Level,
|
||||
sequence: u64,
|
||||
file_size: u64,
|
||||
primary_key_range: Option<(Bytes, Bytes)>,
|
||||
) -> FileHandle {
|
||||
let file_purger = new_noop_file_purger();
|
||||
FileHandle::new_with_primary_key_range(
|
||||
FileMeta {
|
||||
region_id: 0.into(),
|
||||
file_id,
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(start_ts_millis),
|
||||
Timestamp::new_millisecond(end_ts_millis),
|
||||
),
|
||||
level,
|
||||
file_size,
|
||||
max_row_group_uncompressed_size: file_size,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
index_version: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
num_series: 0,
|
||||
sequence: NonZeroU64::new(sequence),
|
||||
partition_expr: None,
|
||||
..Default::default()
|
||||
},
|
||||
file_purger,
|
||||
primary_key_range,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,8 @@ use crate::compaction::buckets::infer_time_bucket;
|
||||
use crate::compaction::compactor::CompactionRegion;
|
||||
use crate::compaction::picker::{Picker, PickerOutput};
|
||||
use crate::compaction::run::{
|
||||
FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
|
||||
FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files,
|
||||
primary_key_ranges_overlap, reduce_runs,
|
||||
};
|
||||
use crate::compaction::{CompactionOutput, get_expired_ssts};
|
||||
use crate::sst::file::{FileHandle, Level, overlaps};
|
||||
@@ -268,12 +269,14 @@ struct Window {
|
||||
files: HashMap<Option<NonZeroU64>, FileGroup>,
|
||||
time_window: i64,
|
||||
overlapping: bool,
|
||||
primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
|
||||
}
|
||||
|
||||
impl Window {
|
||||
/// Creates a new [Window] with given file.
|
||||
fn new_with_file(file: FileHandle) -> Self {
|
||||
let (start, end) = file.time_range();
|
||||
let primary_key_range = file.primary_key_range();
|
||||
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
|
||||
Self {
|
||||
start,
|
||||
@@ -281,6 +284,7 @@ impl Window {
|
||||
files,
|
||||
time_window: 0,
|
||||
overlapping: false,
|
||||
primary_key_range,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,6 +298,8 @@ impl Window {
|
||||
let (start, end) = file.time_range();
|
||||
self.start = self.start.min(start);
|
||||
self.end = self.end.max(end);
|
||||
self.primary_key_range =
|
||||
merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
|
||||
|
||||
match self.files.entry(file.meta_ref().sequence) {
|
||||
Entry::Occupied(mut o) => {
|
||||
@@ -347,18 +353,27 @@ fn assign_to_windows<'a>(
|
||||
let mut windows = windows.into_values().collect::<Vec<_>>();
|
||||
windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
|
||||
|
||||
let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
|
||||
for idx in 0..windows.len() {
|
||||
let lhs_range = windows[idx].range();
|
||||
for next_idx in idx + 1..windows.len() {
|
||||
let rhs_range = windows[next_idx].range();
|
||||
if rhs_range.0 > lhs_range.1 {
|
||||
break;
|
||||
}
|
||||
|
||||
for idx in 1..windows.len() {
|
||||
let next_range = windows[idx].range();
|
||||
if overlaps(¤t_range, &next_range) {
|
||||
windows[idx - 1].overlapping = true;
|
||||
windows[idx].overlapping = true;
|
||||
let windows_overlap = overlaps(&lhs_range, &rhs_range)
|
||||
&& match (
|
||||
&windows[idx].primary_key_range,
|
||||
&windows[next_idx].primary_key_range,
|
||||
) {
|
||||
(Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
|
||||
_ => true,
|
||||
};
|
||||
if windows_overlap {
|
||||
windows[idx].overlapping = true;
|
||||
windows[next_idx].overlapping = true;
|
||||
}
|
||||
}
|
||||
current_range = (
|
||||
current_range.0.min(next_range.0),
|
||||
current_range.1.max(next_range.1),
|
||||
);
|
||||
}
|
||||
|
||||
windows.into_iter().map(|w| (w.time_window, w)).collect()
|
||||
@@ -390,11 +405,13 @@ fn find_latest_window_in_seconds<'a>(
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use bytes::Bytes;
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::{
|
||||
new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
|
||||
new_file_handle_with_size_sequence_and_primary_key_range,
|
||||
};
|
||||
use crate::sst::file::Level;
|
||||
|
||||
@@ -566,6 +583,10 @@ mod tests {
|
||||
/// (Window value, overlapping, files' time ranges in window)
|
||||
type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
|
||||
|
||||
fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
|
||||
Some((Bytes::from_static(min), Bytes::from_static(max)))
|
||||
}
|
||||
|
||||
fn check_assign_to_windows_with_overlapping(
|
||||
file_time_ranges: &[(i64, i64)],
|
||||
time_window: i64,
|
||||
@@ -698,6 +719,63 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_assign_to_windows_not_overlapping_when_pk_disjoint() {
|
||||
let files = [
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
1000,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
),
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
500,
|
||||
1999,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
),
|
||||
];
|
||||
|
||||
let windows = assign_to_windows(files.iter(), 2);
|
||||
|
||||
assert!(!windows.get(&2).unwrap().overlapping);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() {
|
||||
let files = [
|
||||
new_file_handle(FileId::random(), 0, 1999, 0),
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
2000,
|
||||
3999,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
),
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
3000,
|
||||
4999,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
),
|
||||
];
|
||||
|
||||
let windows = assign_to_windows(files.iter(), 2);
|
||||
|
||||
assert!(!windows.get(&4).unwrap().overlapping);
|
||||
}
|
||||
|
||||
struct CompactionPickerTestCase {
|
||||
window_size: i64,
|
||||
input_files: Vec<FileHandle>,
|
||||
@@ -832,6 +910,42 @@ mod tests {
|
||||
.check();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_output_skips_pk_disjoint_files() {
|
||||
let files = [
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
0,
|
||||
2999,
|
||||
0,
|
||||
1,
|
||||
10,
|
||||
pk_range(b"a", b"f"),
|
||||
),
|
||||
new_file_handle_with_size_sequence_and_primary_key_range(
|
||||
FileId::random(),
|
||||
50,
|
||||
2998,
|
||||
0,
|
||||
2,
|
||||
10,
|
||||
pk_range(b"x", b"z"),
|
||||
),
|
||||
];
|
||||
let mut windows = assign_to_windows(files.iter(), 3);
|
||||
let active_window = find_latest_window_in_seconds(files.iter(), 3);
|
||||
let output = TwcsPicker {
|
||||
trigger_file_num: 4,
|
||||
time_window_seconds: None,
|
||||
max_output_file_size: None,
|
||||
append_mode: false,
|
||||
max_background_tasks: None,
|
||||
}
|
||||
.build_output(RegionId::from_u64(0), &mut windows, active_window);
|
||||
|
||||
assert!(output.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_mode_filter_large_files() {
|
||||
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
|
||||
@@ -131,6 +131,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.trigger_file_num", "2")
|
||||
.insert_option("append_mode", "true")
|
||||
.build();
|
||||
let table_dir = request.table_dir.clone();
|
||||
@@ -199,7 +200,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
|
||||
.scanner(region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(1, scanner.num_files());
|
||||
assert_eq!(2, scanner.num_files());
|
||||
assert_eq!(1, scanner.num_memtables());
|
||||
scanner.set_target_partitions(2);
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Instant;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use partition::expr::PartitionExpr;
|
||||
@@ -57,6 +58,7 @@ use crate::request::{
|
||||
};
|
||||
use crate::schedule::scheduler::{Job, SchedulerRef};
|
||||
use crate::sst::file::FileMeta;
|
||||
use crate::sst::parquet::metadata::extract_primary_key_range;
|
||||
use crate::sst::parquet::{
|
||||
DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
|
||||
};
|
||||
@@ -504,15 +506,20 @@ impl RegionFlushTask {
|
||||
|
||||
flush_metrics = flush_metrics.merge(metrics);
|
||||
|
||||
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
|
||||
for sst_info in ssts_written {
|
||||
flushed_bytes += sst_info.file_size;
|
||||
Self::new_file_meta(
|
||||
let pk_range = sst_info
|
||||
.file_metadata
|
||||
.as_ref()
|
||||
.and_then(|meta| extract_primary_key_range(meta, &version.metadata));
|
||||
file_metas.push(Self::new_file_meta(
|
||||
self.region_id,
|
||||
max_sequence,
|
||||
sst_info,
|
||||
partition_expr.clone(),
|
||||
)
|
||||
}));
|
||||
pk_range,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
common_telemetry::debug!(
|
||||
@@ -604,7 +611,12 @@ impl RegionFlushTask {
|
||||
max_sequence: u64,
|
||||
sst_info: SstInfo,
|
||||
partition_expr: Option<PartitionExpr>,
|
||||
primary_key_range: Option<(Bytes, Bytes)>,
|
||||
) -> FileMeta {
|
||||
let (primary_key_min, primary_key_max) = match primary_key_range {
|
||||
Some((min, max)) => (Some(min), Some(max)),
|
||||
None => (None, None),
|
||||
};
|
||||
FileMeta {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
@@ -621,6 +633,8 @@ impl RegionFlushTask {
|
||||
sequence: NonZeroU64::new(max_sequence),
|
||||
partition_expr,
|
||||
num_series: sst_info.num_series,
|
||||
primary_key_min,
|
||||
primary_key_max,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -257,6 +257,7 @@ async fn checkpoint_with_different_compression_types() {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
|
||||
files_to_add: vec![file_meta],
|
||||
@@ -323,6 +324,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
|
||||
files_to_add: vec![file_meta],
|
||||
|
||||
@@ -70,7 +70,7 @@ use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
use crate::sst::index::intermediate::IntermediateManager;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use crate::sst::location::{self, region_dir_from_table_dir};
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
use crate::time_provider::TimeProviderRef;
|
||||
use crate::wal::entry_reader::WalEntryReader;
|
||||
@@ -991,6 +991,7 @@ fn maybe_load_cache(
|
||||
/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
|
||||
/// directly from the local store.
|
||||
/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn preload_parquet_meta_cache_for_files(
|
||||
region_id: RegionId,
|
||||
cache_manager: CacheManagerRef,
|
||||
@@ -998,6 +999,7 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
table_dir: String,
|
||||
path_type: PathType,
|
||||
object_store: object_store::ObjectStore,
|
||||
region_metadata: RegionMetadataRef,
|
||||
mut files: Vec<FileHandle>,
|
||||
) -> usize {
|
||||
if !cache_manager.sst_meta_cache_enabled()
|
||||
@@ -1021,11 +1023,16 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
|
||||
let file_id = file_handle.file_id();
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
if cache_manager
|
||||
if let Some(metadata) = cache_manager
|
||||
.get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
|
||||
.await
|
||||
.is_some()
|
||||
{
|
||||
if file_handle.primary_key_range().is_none()
|
||||
&& let Some(primary_key_range) =
|
||||
extract_primary_key_range(&metadata, ®ion_metadata)
|
||||
{
|
||||
file_handle.set_primary_key_range(primary_key_range);
|
||||
}
|
||||
// Metadata is either already in memory or loaded from file cache.
|
||||
if cache_metrics.mem_cache_hit == 0 {
|
||||
loaded += 1;
|
||||
@@ -1042,6 +1049,11 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
match loader.load(&mut cache_metrics).await {
|
||||
Ok(metadata) => {
|
||||
if let Some(primary_key_range) =
|
||||
extract_primary_key_range(&metadata, ®ion_metadata)
|
||||
{
|
||||
file_handle.set_primary_key_range(primary_key_range);
|
||||
}
|
||||
cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
|
||||
loaded += 1;
|
||||
}
|
||||
@@ -1090,6 +1102,7 @@ fn maybe_preload_parquet_meta_cache(
|
||||
let table_dir = region.access_layer.table_dir().to_string();
|
||||
let path_type = region.access_layer.path_type();
|
||||
let object_store = region.access_layer.object_store().clone();
|
||||
let region_metadata = region.version_control.current().version.metadata.clone();
|
||||
|
||||
// Collect SST files. Do not hold the version longer than needed.
|
||||
let mut files = Vec::new();
|
||||
@@ -1109,6 +1122,7 @@ fn maybe_preload_parquet_meta_cache(
|
||||
table_dir,
|
||||
path_type,
|
||||
object_store,
|
||||
region_metadata,
|
||||
files,
|
||||
)
|
||||
.await;
|
||||
@@ -1147,7 +1161,7 @@ mod tests {
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array};
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::services::{Fs, Memory};
|
||||
@@ -1203,7 +1217,15 @@ mod tests {
|
||||
let file_id = FileId::random();
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([
|
||||
("col", col),
|
||||
(
|
||||
store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
|
||||
primary_key,
|
||||
),
|
||||
])
|
||||
.unwrap();
|
||||
let parquet_bytes = sst_parquet_bytes(&batch);
|
||||
let file_size = parquet_bytes.len() as u64;
|
||||
|
||||
@@ -1223,6 +1245,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
|
||||
|
||||
@@ -1258,7 +1281,8 @@ mod tests {
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
source_store.clone(),
|
||||
vec![file_handle],
|
||||
Arc::new(sst_region_metadata()),
|
||||
vec![file_handle.clone()],
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1269,6 +1293,7 @@ mod tests {
|
||||
.get_parquet_meta_data_from_mem_cache(region_file_id)
|
||||
.is_some()
|
||||
);
|
||||
assert!(file_handle.primary_key_range().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1299,6 +1324,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
|
||||
|
||||
@@ -1327,6 +1353,7 @@ mod tests {
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
object_store,
|
||||
Arc::new(sst_region_metadata()),
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
@@ -1372,6 +1399,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
|
||||
|
||||
@@ -1399,6 +1427,7 @@ mod tests {
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
object_store,
|
||||
Arc::new(sst_region_metadata()),
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -436,6 +436,7 @@ mod tests {
|
||||
sequence: NonZeroU64::new(1),
|
||||
partition_expr,
|
||||
num_series: 1,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,9 +17,11 @@
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use base64::prelude::{BASE64_STANDARD, Engine};
|
||||
use bytes::Bytes;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{debug, error};
|
||||
use common_time::Timestamp;
|
||||
@@ -36,6 +38,33 @@ use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::sst::file_purger::FilePurgerRef;
|
||||
use crate::sst::location;
|
||||
|
||||
/// Custom serde functions for Bytes fields serialized as base64 strings.
|
||||
fn serialize_bytes_option<S>(bytes: &Option<Bytes>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match bytes {
|
||||
None => serializer.serialize_none(),
|
||||
Some(b) => serializer.serialize_some(&BASE64_STANDARD.encode(b)),
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_bytes_option<'de, D>(deserializer: D) -> Result<Option<Bytes>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let opt: Option<String> = Option::deserialize(deserializer)?;
|
||||
match opt {
|
||||
None => Ok(None),
|
||||
Some(s) => {
|
||||
let decoded = BASE64_STANDARD
|
||||
.decode(&s)
|
||||
.map_err(serde::de::Error::custom)?;
|
||||
Ok(Some(Bytes::from(decoded)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Custom serde functions for partition_expr field in FileMeta
|
||||
fn serialize_partition_expr<S>(
|
||||
partition_expr: &Option<PartitionExpr>,
|
||||
@@ -233,6 +262,24 @@ pub struct FileMeta {
|
||||
///
|
||||
/// The number is 0 if the series number is not available.
|
||||
pub num_series: u64,
|
||||
/// Minimum primary key value in the file, encoded as bytes.
|
||||
/// `None` if the primary key range is not available (e.g., legacy files).
|
||||
#[serde(
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none",
|
||||
serialize_with = "serialize_bytes_option",
|
||||
deserialize_with = "deserialize_bytes_option"
|
||||
)]
|
||||
pub primary_key_min: Option<Bytes>,
|
||||
/// Maximum primary key value in the file, encoded as bytes.
|
||||
/// `None` if the primary key range is not available (e.g., legacy files).
|
||||
#[serde(
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none",
|
||||
serialize_with = "serialize_bytes_option",
|
||||
deserialize_with = "deserialize_bytes_option"
|
||||
)]
|
||||
pub primary_key_max: Option<Bytes>,
|
||||
}
|
||||
|
||||
impl Debug for FileMeta {
|
||||
@@ -273,8 +320,19 @@ impl Debug for FileMeta {
|
||||
}
|
||||
})
|
||||
.field("partition_expr", &self.partition_expr)
|
||||
.field("num_series", &self.num_series)
|
||||
.finish()
|
||||
.field("num_series", &self.num_series);
|
||||
if self.primary_key_min.is_some() || self.primary_key_max.is_some() {
|
||||
debug_struct
|
||||
.field(
|
||||
"primary_key_min",
|
||||
&self.primary_key_min.as_ref().map(|b| b.len()),
|
||||
)
|
||||
.field(
|
||||
"primary_key_max",
|
||||
&self.primary_key_max.as_ref().map(|b| b.len()),
|
||||
);
|
||||
}
|
||||
debug_struct.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,6 +369,14 @@ pub struct ColumnIndexMetadata {
|
||||
}
|
||||
|
||||
impl FileMeta {
|
||||
/// Returns the primary key range if both min and max are present.
|
||||
pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
|
||||
match (&self.primary_key_min, &self.primary_key_max) {
|
||||
(Some(min), Some(max)) => Some((min.clone(), max.clone())),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exists_index(&self) -> bool {
|
||||
!self.available_indexes.is_empty()
|
||||
}
|
||||
@@ -323,7 +389,7 @@ impl FileMeta {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the index file is up-to-date comparing to another file meta.
|
||||
/// Whether the index file is up-to-date comparing to another file meta.
|
||||
pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
|
||||
self.exists_index() && other.exists_index() && self.index_version >= other.index_version
|
||||
}
|
||||
@@ -417,8 +483,20 @@ impl fmt::Debug for FileHandle {
|
||||
|
||||
impl FileHandle {
|
||||
pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
|
||||
let pk_range = meta.primary_key_range();
|
||||
FileHandle {
|
||||
inner: Arc::new(FileHandleInner::new(meta, file_purger)),
|
||||
inner: Arc::new(FileHandleInner::new(meta, file_purger, pk_range)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_with_primary_key_range(
|
||||
meta: FileMeta,
|
||||
file_purger: FilePurgerRef,
|
||||
primary_key_range: Option<(Bytes, Bytes)>,
|
||||
) -> FileHandle {
|
||||
FileHandle {
|
||||
inner: Arc::new(FileHandleInner::new(meta, file_purger, primary_key_range)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -498,6 +576,14 @@ impl FileHandle {
|
||||
pub fn is_deleted(&self) -> bool {
|
||||
self.inner.deleted.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
|
||||
self.inner.primary_key_range.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_primary_key_range(&self, primary_key_range: (Bytes, Bytes)) {
|
||||
*self.inner.primary_key_range.write().unwrap() = Some(primary_key_range);
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner data of [FileHandle].
|
||||
@@ -508,6 +594,7 @@ struct FileHandleInner {
|
||||
compacting: AtomicBool,
|
||||
deleted: AtomicBool,
|
||||
index_outdated: AtomicBool,
|
||||
primary_key_range: RwLock<Option<(Bytes, Bytes)>>,
|
||||
file_purger: FilePurgerRef,
|
||||
}
|
||||
|
||||
@@ -523,13 +610,18 @@ impl Drop for FileHandleInner {
|
||||
|
||||
impl FileHandleInner {
|
||||
/// There should only be one `FileHandleInner` for each file on a datanode
|
||||
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
|
||||
fn new(
|
||||
meta: FileMeta,
|
||||
file_purger: FilePurgerRef,
|
||||
primary_key_range: Option<(Bytes, Bytes)>,
|
||||
) -> FileHandleInner {
|
||||
file_purger.new_file(&meta);
|
||||
FileHandleInner {
|
||||
meta,
|
||||
compacting: AtomicBool::new(false),
|
||||
deleted: AtomicBool::new(false),
|
||||
index_outdated: AtomicBool::new(false),
|
||||
primary_key_range: RwLock::new(primary_key_range),
|
||||
file_purger,
|
||||
}
|
||||
}
|
||||
@@ -734,6 +826,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -786,6 +879,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: Some(partition_expr.clone()),
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Test serialization/deserialization
|
||||
|
||||
@@ -283,6 +283,7 @@ mod tests {
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
},
|
||||
file_purger,
|
||||
);
|
||||
@@ -357,6 +358,7 @@ mod tests {
|
||||
sequence: NonZeroU64::new(4096),
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
},
|
||||
file_purger,
|
||||
);
|
||||
|
||||
@@ -285,6 +285,7 @@ mod tests {
|
||||
sequence: NonZeroU64::new(4096),
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
@@ -832,6 +832,7 @@ mod tests {
|
||||
None => None,
|
||||
},
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
},
|
||||
Arc::new(NoopFilePurger),
|
||||
);
|
||||
@@ -1285,6 +1286,7 @@ mod tests {
|
||||
None => None,
|
||||
},
|
||||
num_series: 0,
|
||||
..Default::default()
|
||||
},
|
||||
Arc::new(NoopFilePurger),
|
||||
)
|
||||
|
||||
@@ -22,7 +22,10 @@ use object_store::ObjectStore;
|
||||
use parquet::arrow::async_reader::MetadataFetch;
|
||||
use parquet::errors::{ParquetError, Result as ParquetResult};
|
||||
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
|
||||
use parquet::file::statistics::Statistics;
|
||||
use snafu::{IntoError as _, ResultExt};
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
@@ -115,6 +118,44 @@ fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, Parqu
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn extract_primary_key_range(
|
||||
parquet_meta: &ParquetMetaData,
|
||||
region_metadata: &RegionMetadata,
|
||||
) -> Option<(Bytes, Bytes)> {
|
||||
if region_metadata.primary_key.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let pk_column_idx = parquet_meta
|
||||
.file_metadata()
|
||||
.schema_descr()
|
||||
.columns()
|
||||
.iter()
|
||||
.position(|column| column.name() == PRIMARY_KEY_COLUMN_NAME)?;
|
||||
|
||||
let mut min: Option<Bytes> = None;
|
||||
let mut max: Option<Bytes> = None;
|
||||
|
||||
for row_group in parquet_meta.row_groups() {
|
||||
let Statistics::ByteArray(stats) = row_group.column(pk_column_idx).statistics()? else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let row_group_min = Bytes::copy_from_slice(stats.min_bytes_opt()?);
|
||||
let row_group_max = Bytes::copy_from_slice(stats.max_bytes_opt()?);
|
||||
min = Some(match min {
|
||||
Some(current) => current.min(row_group_min),
|
||||
None => row_group_min,
|
||||
});
|
||||
max = Some(match max {
|
||||
Some(current) => current.max(row_group_max),
|
||||
None => row_group_max,
|
||||
});
|
||||
}
|
||||
|
||||
min.zip(max)
|
||||
}
|
||||
|
||||
struct ObjectStoreFetch<'a> {
|
||||
object_store: &'a ObjectStore,
|
||||
file_path: &'a str,
|
||||
@@ -139,3 +180,120 @@ impl MetadataFetch for ObjectStoreFetch<'_> {
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::arrow::array::{
|
||||
ArrayRef, BinaryArray, DictionaryArray, Int64Array, UInt32Array,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||||
use parquet::file::metadata::{KeyValue, ParquetMetaData};
|
||||
use parquet::file::properties::{EnabledStatistics, WriterProperties};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::PARQUET_METADATA_KEY;
|
||||
use crate::test_util::sst_util::sst_region_metadata;
|
||||
|
||||
fn build_test_metadata(
|
||||
include_primary_key: bool,
|
||||
primary_keys: &[&[u8]],
|
||||
row_group_sizes: &[usize],
|
||||
stats_enabled: EnabledStatistics,
|
||||
) -> ParquetMetaData {
|
||||
let total_rows = row_group_sizes.iter().sum::<usize>();
|
||||
let mut fields = vec![Field::new("field", ArrowDataType::Int64, true)];
|
||||
let mut columns: Vec<ArrayRef> =
|
||||
vec![Arc::new(Int64Array::from_iter_values(0..total_rows as i64))];
|
||||
if include_primary_key {
|
||||
assert_eq!(total_rows, primary_keys.len());
|
||||
fields.push(Field::new(
|
||||
"__primary_key",
|
||||
ArrowDataType::Dictionary(
|
||||
Box::new(ArrowDataType::UInt32),
|
||||
Box::new(ArrowDataType::Binary),
|
||||
),
|
||||
false,
|
||||
));
|
||||
let values = Arc::new(BinaryArray::from_iter_values(primary_keys.iter().copied()));
|
||||
let keys = UInt32Array::from_iter_values(0..primary_keys.len() as u32);
|
||||
columns.push(Arc::new(DictionaryArray::new(keys, values)));
|
||||
}
|
||||
|
||||
let schema = Arc::new(Schema::new(fields));
|
||||
let region_metadata = Arc::new(sst_region_metadata());
|
||||
let key_value = KeyValue::new(
|
||||
PARQUET_METADATA_KEY.to_string(),
|
||||
region_metadata.to_json().unwrap(),
|
||||
);
|
||||
let props = WriterProperties::builder()
|
||||
.set_key_value_metadata(Some(vec![key_value]))
|
||||
.set_statistics_enabled(stats_enabled)
|
||||
.build();
|
||||
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer =
|
||||
ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), Some(props)).unwrap();
|
||||
let mut offset = 0;
|
||||
for row_group_size in row_group_sizes {
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
columns
|
||||
.iter()
|
||||
.map(|column| column.slice(offset, *row_group_size))
|
||||
.collect(),
|
||||
)
|
||||
.unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
offset += row_group_size;
|
||||
}
|
||||
writer.close().unwrap();
|
||||
|
||||
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(parquet_bytes))
|
||||
.unwrap()
|
||||
.metadata()
|
||||
.as_ref()
|
||||
.clone()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_primary_key_range_returns_none_when_column_absent() {
|
||||
let metadata = build_test_metadata(false, &[], &[1], EnabledStatistics::Page);
|
||||
let region_metadata = sst_region_metadata();
|
||||
|
||||
assert_eq!(None, extract_primary_key_range(&metadata, ®ion_metadata));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_primary_key_range_folds_row_group_stats() {
|
||||
let metadata = build_test_metadata(
|
||||
true,
|
||||
&[b"bbb", b"ccc", b"aaa", b"zzz"],
|
||||
&[2, 2],
|
||||
EnabledStatistics::Page,
|
||||
);
|
||||
let region_metadata = sst_region_metadata();
|
||||
|
||||
assert_eq!(
|
||||
Some((Bytes::from_static(b"aaa"), Bytes::from_static(b"zzz"))),
|
||||
extract_primary_key_range(&metadata, ®ion_metadata)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_primary_key_range_returns_none_when_any_rg_stats_missing() {
|
||||
let metadata = build_test_metadata(
|
||||
true,
|
||||
&[b"bbb", b"ccc", b"aaa", b"zzz"],
|
||||
&[2, 2],
|
||||
EnabledStatistics::None,
|
||||
);
|
||||
let region_metadata = sst_region_metadata();
|
||||
|
||||
assert_eq!(None, extract_primary_key_range(&metadata, ®ion_metadata));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,7 +78,9 @@ impl SstVersion {
|
||||
*f = FileHandle::new(file.clone(), file_purger.clone());
|
||||
}
|
||||
})
|
||||
.or_insert_with(|| FileHandle::new(file.clone(), file_purger.clone()));
|
||||
.or_insert_with(|| {
|
||||
FileHandle::new(file.clone(), file_purger.clone())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -213,6 +213,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
|
||||
num_series: 0,
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
..Default::default()
|
||||
},
|
||||
file_purger,
|
||||
)
|
||||
|
||||
@@ -115,6 +115,7 @@ impl VersionControlBuilder {
|
||||
.expect("partition expression should be valid JSON"),
|
||||
None => None,
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
self
|
||||
@@ -207,6 +208,7 @@ pub(crate) fn apply_edit(
|
||||
.expect("partition expression should be valid JSON"),
|
||||
None => None,
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Reference in New Issue
Block a user