diff --git a/Cargo.lock b/Cargo.lock index 2d3034f1e5..a40d5b7cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7989,6 +7989,7 @@ dependencies = [ "async-channel 1.9.0", "async-stream", "async-trait", + "base64 0.22.1", "bytemuck", "bytes", "chrono", diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 1f3591635f..d8793bee2f 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -190,6 +190,7 @@ impl ObjbenchCommand { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let src_handle = FileHandle::new(file_meta, new_noop_file_purger()); diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index a78bf079b0..c5192f1360 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -20,6 +20,7 @@ async-channel = "1.9" common-stat.workspace = true async-stream.workspace = true async-trait.workspace = true +base64.workspace = true bytemuck.workspace = true bytes.workspace = true chrono.workspace = true diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d28f2db189..59a8a10077 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -55,6 +55,7 @@ use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; use crate::sst::parquet::WriteOptions; +use crate::sst::parquet::metadata::extract_primary_key_range; use crate::sst::version::{SstVersion, SstVersionRef}; /// Region version for compaction that does not hold memtables. @@ -405,22 +406,35 @@ impl SstMerger for DefaultSstMerger { let output_files = sst_infos .into_iter() - .map(|sst_info| FileMeta { - region_id, - file_id: sst_info.file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, - available_indexes: sst_info.index_metadata.build_available_indexes(), - indexes: sst_info.index_metadata.build_indexes(), - index_file_size: sst_info.index_metadata.file_size, - index_version: 0, - num_rows: sst_info.num_rows as u64, - num_row_groups: sst_info.num_row_groups, - sequence: max_sequence, - partition_expr: partition_expr.clone(), - num_series: sst_info.num_series, + .map(|sst_info| { + let pk_range = sst_info + .file_metadata + .as_ref() + .and_then(|meta| extract_primary_key_range(meta, ®ion_metadata)); + let (primary_key_min, primary_key_max) = match pk_range { + Some((min, max)) => (Some(min), Some(max)), + None => (None, None), + }; + + FileMeta { + region_id, + file_id: sst_info.file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, + available_indexes: sst_info.index_metadata.build_available_indexes(), + indexes: sst_info.index_metadata.build_indexes(), + index_file_size: sst_info.index_metadata.file_size, + index_version: 0, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + sequence: max_sequence, + partition_expr: partition_expr.clone(), + num_series: sst_info.num_series, + primary_key_min, + primary_key_max, + } }) .collect::>(); let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(","); diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index cf1cedd29a..27d4f5779e 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -15,6 +15,7 @@ //! This file contains code to find sorted runs in a set if ranged items and //! along with the best way to merge these items to satisfy the desired run count. +use bytes::{Buf, Bytes}; use common_base::BitVec; use common_base::readable_size::ReadableSize; use common_time::Timestamp; @@ -32,15 +33,37 @@ pub trait Ranged { /// Returns the inclusive range of item. fn range(&self) -> (Self::BoundType, Self::BoundType); - fn overlap(&self, other: &T) -> bool - where - T: Ranged, - { + fn overlap(&self, other: &Self) -> bool { let (lhs_start, lhs_end) = self.range(); let (rhs_start, rhs_end) = other.range(); lhs_start.max(rhs_start) < lhs_end.min(rhs_end) } + + /// Like `overlap`, but treats touching boundaries as overlapping (inclusive). + /// Used by `find_overlapping_items` where shared boundaries count as overlap. + fn overlap_inclusive(&self, other: &Self) -> bool { + let (lhs_start, lhs_end) = self.range(); + let (rhs_start, rhs_end) = other.range(); + + lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) + } +} + +pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool { + lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk()) +} + +pub(crate) fn merge_primary_key_ranges( + lhs: Option<(Bytes, Bytes)>, + rhs: Option<(Bytes, Bytes)>, +) -> Option<(Bytes, Bytes)> { + match (lhs, rhs) { + (Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => { + Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max))) + } + _ => None, + } } pub fn find_overlapping_items( @@ -85,15 +108,15 @@ pub fn find_overlapping_items( // Check for overlaps with remaining right elements let mut j = r_idx; while j < r.items.len() { - let (rhs_start, rhs_end) = r.items[j].range(); + let (rhs_start, _rhs_end) = r.items[j].range(); // If right element starts after left element ends, no more overlaps possible if rhs_start > lhs_end { break; } - // We have an overlap - if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) { + // We have an overlap (inclusive: touching boundaries count) + if lhs.overlap_inclusive(&r.items[j]) { if !selected[lhs_idx] { result.push(lhs.clone()); selected.set(lhs_idx, true); @@ -134,6 +157,7 @@ pub struct FileGroup { num_rows: usize, min_timestamp: Timestamp, max_timestamp: Timestamp, + primary_key_range: Option<(Bytes, Bytes)>, } impl FileGroup { @@ -141,12 +165,14 @@ impl FileGroup { let size = file.size() as usize; let (min_timestamp, max_timestamp) = file.time_range(); let num_rows = file.num_rows(); + let primary_key_range = file.primary_key_range(); Self { files: smallvec![file], size, num_rows, min_timestamp, max_timestamp, + primary_key_range, } } @@ -160,6 +186,8 @@ impl FileGroup { let (min_timestamp, max_timestamp) = file.time_range(); self.min_timestamp = self.min_timestamp.min(min_timestamp); self.max_timestamp = self.max_timestamp.max(max_timestamp); + self.primary_key_range = + merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range()); self.files.push(file); } @@ -187,6 +215,32 @@ impl Ranged for FileGroup { fn range(&self) -> (Self::BoundType, Self::BoundType) { (self.min_timestamp, self.max_timestamp) } + + fn overlap(&self, other: &Self) -> bool { + let (lhs_start, lhs_end) = self.range(); + let (rhs_start, rhs_end) = other.range(); + if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) { + return false; + } + + match (&self.primary_key_range, &other.primary_key_range) { + (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), + _ => true, + } + } + + fn overlap_inclusive(&self, other: &Self) -> bool { + let (lhs_start, lhs_end) = self.range(); + let (rhs_start, rhs_end) = other.range(); + if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) { + return false; + } + + match (&self.primary_key_range, &other.primary_key_range) { + (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), + _ => true, + } + } } impl Item for FileGroup { @@ -281,21 +335,20 @@ where // item is already assigned. continue; } - match current_run.items.last() { - None => { - // current run is empty, just add current_item + if current_run.items.is_empty() { + // current run is empty, just add current_item + selected.set(true); + current_run.push_item(item.clone()); + } else { + // the current item does not overlap with any item in current run, + // then it belongs to current run. Because now we introduced primary + // key range, we cannot simply use timestamps to check overlapping. + let overlaps_any = current_run.items.iter().any(|i| i.overlap(item)); + if !overlaps_any { + // does not overlap, push to current run selected.set(true); current_run.push_item(item.clone()); } - Some(last) => { - // the current item does not overlap with the last item in current run, - // then it belongs to current run. - if !last.overlap(item) { - // does not overlap, push to current run - selected.set(true); - current_run.push_item(item.clone()); - } - } } } // finished an iteration, we've found a new run. @@ -422,7 +475,11 @@ pub fn merge_seq_files(input_files: &[T], max_file_size: Option) - mod tests { use std::collections::HashSet; + use bytes::Bytes; + use store_api::storage::FileId; + use super::*; + use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range; #[derive(Clone, Debug, PartialEq)] struct MockFile { @@ -467,6 +524,10 @@ mod tests { .collect() } + fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> { + Some((Bytes::from_static(min), Bytes::from_static(max))) + } + fn check_sorted_runs( ranges: &[(i64, i64)], expected_runs: &[Vec<(i64, i64)>], @@ -726,6 +787,157 @@ mod tests { assert_eq!(result.len(), 4); // Should find both overlaps } + #[test] + fn test_file_group_overlap_time_overlap_pk_disjoint() { + let lhs = + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 100, + 0, + 1, + 10, + pk_range(b"a", b"f"), + )); + let rhs = + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 150, + 0, + 2, + 10, + pk_range(b"x", b"z"), + )); + + assert!(!lhs.overlap(&rhs)); + } + + #[test] + fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() { + let mut files = vec![ + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 100, + 0, + 1, + 10, + pk_range(b"a", b"f"), + )), + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 150, + 0, + 2, + 10, + pk_range(b"x", b"z"), + )), + ]; + + let runs = find_sorted_runs(&mut files); + + assert_eq!(1, runs.len()); + assert_eq!(2, runs[0].items().len()); + } + + #[test] + fn test_find_sorted_runs_handles_2d_transitivity_break() { + let mut files = vec![ + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 100, + 0, + 1, + 10, + pk_range(b"a", b"f"), + )), + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 150, + 0, + 2, + 10, + pk_range(b"x", b"z"), + )), + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 150, + 0, + 3, + 10, + pk_range(b"a", b"f"), + )), + ]; + + let runs = find_sorted_runs(&mut files); + + assert_eq!(2, runs.len()); + assert_eq!(2, runs[0].items().len()); + assert_eq!(1, runs[1].items().len()); + } + + #[test] + fn test_find_overlapping_items_skips_pk_disjoint_pairs() { + let mut left = SortedRun::from(vec![FileGroup::new_with_file( + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 100, + 0, + 1, + 10, + pk_range(b"a", b"f"), + ), + )]); + let mut right = SortedRun::from(vec![FileGroup::new_with_file( + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 150, + 0, + 2, + 10, + pk_range(b"x", b"z"), + ), + )]); + let mut result = Vec::new(); + + find_overlapping_items(&mut left, &mut right, &mut result); + + assert!(result.is_empty()); + } + + #[test] + fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() { + let lhs = + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 100, + 0, + 1, + 10, + pk_range(b"a", b"f"), + )); + let rhs = + FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 100, + 150, + 0, + 2, + 10, + pk_range(b"a", b"f"), + )); + + assert!(!lhs.overlap(&rhs)); + } + #[test] fn test_merge_seq_files() { // Test empty input diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 6061e294bd..f6b39fdc8c 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -14,6 +14,7 @@ use std::num::NonZeroU64; +use bytes::Bytes; use common_time::Timestamp; use store_api::storage::FileId; @@ -84,7 +85,46 @@ pub fn new_file_handle_with_size_and_sequence( num_series: 0, sequence: NonZeroU64::new(sequence), partition_expr: None, + ..Default::default() }, file_purger, ) } + +/// Test util to create file handles with custom size and primary-key range. +pub fn new_file_handle_with_size_sequence_and_primary_key_range( + file_id: FileId, + start_ts_millis: i64, + end_ts_millis: i64, + level: Level, + sequence: u64, + file_size: u64, + primary_key_range: Option<(Bytes, Bytes)>, +) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new_with_primary_key_range( + FileMeta { + region_id: 0.into(), + file_id, + time_range: ( + Timestamp::new_millisecond(start_ts_millis), + Timestamp::new_millisecond(end_ts_millis), + ), + level, + file_size, + max_row_group_uncompressed_size: file_size, + available_indexes: Default::default(), + indexes: Default::default(), + index_file_size: 0, + index_version: 0, + num_rows: 0, + num_row_groups: 0, + num_series: 0, + sequence: NonZeroU64::new(sequence), + partition_expr: None, + ..Default::default() + }, + file_purger, + primary_key_range, + ) +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 952d8771d8..87dcac7279 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -28,7 +28,8 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{ - FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs, + FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files, + primary_key_ranges_overlap, reduce_runs, }; use crate::compaction::{CompactionOutput, get_expired_ssts}; use crate::sst::file::{FileHandle, Level, overlaps}; @@ -268,12 +269,14 @@ struct Window { files: HashMap, FileGroup>, time_window: i64, overlapping: bool, + primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>, } impl Window { /// Creates a new [Window] with given file. fn new_with_file(file: FileHandle) -> Self { let (start, end) = file.time_range(); + let primary_key_range = file.primary_key_range(); let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]); Self { start, @@ -281,6 +284,7 @@ impl Window { files, time_window: 0, overlapping: false, + primary_key_range, } } @@ -294,6 +298,8 @@ impl Window { let (start, end) = file.time_range(); self.start = self.start.min(start); self.end = self.end.max(end); + self.primary_key_range = + merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range()); match self.files.entry(file.meta_ref().sequence) { Entry::Occupied(mut o) => { @@ -347,18 +353,27 @@ fn assign_to_windows<'a>( let mut windows = windows.into_values().collect::>(); windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse())); - let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty. + for idx in 0..windows.len() { + let lhs_range = windows[idx].range(); + for next_idx in idx + 1..windows.len() { + let rhs_range = windows[next_idx].range(); + if rhs_range.0 > lhs_range.1 { + break; + } - for idx in 1..windows.len() { - let next_range = windows[idx].range(); - if overlaps(¤t_range, &next_range) { - windows[idx - 1].overlapping = true; - windows[idx].overlapping = true; + let windows_overlap = overlaps(&lhs_range, &rhs_range) + && match ( + &windows[idx].primary_key_range, + &windows[next_idx].primary_key_range, + ) { + (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs), + _ => true, + }; + if windows_overlap { + windows[idx].overlapping = true; + windows[next_idx].overlapping = true; + } } - current_range = ( - current_range.0.min(next_range.0), - current_range.1.max(next_range.1), - ); } windows.into_iter().map(|w| (w.time_window, w)).collect() @@ -390,11 +405,13 @@ fn find_latest_window_in_seconds<'a>( mod tests { use std::collections::HashSet; + use bytes::Bytes; use store_api::storage::FileId; use super::*; use crate::compaction::test_util::{ new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence, + new_file_handle_with_size_sequence_and_primary_key_range, }; use crate::sst::file::Level; @@ -566,6 +583,10 @@ mod tests { /// (Window value, overlapping, files' time ranges in window) type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>); + fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> { + Some((Bytes::from_static(min), Bytes::from_static(max))) + } + fn check_assign_to_windows_with_overlapping( file_time_ranges: &[(i64, i64)], time_window: i64, @@ -698,6 +719,63 @@ mod tests { ); } + #[test] + fn test_assign_to_windows_not_overlapping_when_pk_disjoint() { + let files = [ + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 1000, + 0, + 1, + 10, + pk_range(b"a", b"f"), + ), + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 500, + 1999, + 0, + 2, + 10, + pk_range(b"x", b"z"), + ), + ]; + + let windows = assign_to_windows(files.iter(), 2); + + assert!(!windows.get(&2).unwrap().overlapping); + } + + #[test] + fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() { + let files = [ + new_file_handle(FileId::random(), 0, 1999, 0), + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 2000, + 3999, + 0, + 1, + 10, + pk_range(b"a", b"f"), + ), + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 3000, + 4999, + 0, + 2, + 10, + pk_range(b"x", b"z"), + ), + ]; + + let windows = assign_to_windows(files.iter(), 2); + + assert!(!windows.get(&4).unwrap().overlapping); + } + struct CompactionPickerTestCase { window_size: i64, input_files: Vec, @@ -832,6 +910,42 @@ mod tests { .check(); } + #[test] + fn test_build_output_skips_pk_disjoint_files() { + let files = [ + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 0, + 2999, + 0, + 1, + 10, + pk_range(b"a", b"f"), + ), + new_file_handle_with_size_sequence_and_primary_key_range( + FileId::random(), + 50, + 2998, + 0, + 2, + 10, + pk_range(b"x", b"z"), + ), + ]; + let mut windows = assign_to_windows(files.iter(), 3); + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: None, + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + } + .build_output(RegionId::from_u64(0), &mut windows, active_window); + + assert!(output.is_empty()); + } + #[test] fn test_append_mode_filter_large_files() { let file_ids = (0..4).map(|_| FileId::random()).collect::>(); diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index fa7db1f573..de58e04e46 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -131,6 +131,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) { let request = CreateRequestBuilder::new() .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.trigger_file_num", "2") .insert_option("append_mode", "true") .build(); let table_dir = request.table_dir.clone(); @@ -199,7 +200,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) { .scanner(region_id, ScanRequest::default()) .await .unwrap(); - assert_eq!(1, scanner.num_files()); + assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 7be81dec8d..3af94c08e6 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; +use bytes::Bytes; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; @@ -57,6 +58,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::FileMeta; +use crate::sst::parquet::metadata::extract_primary_key_range; use crate::sst::parquet::{ DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format, }; @@ -504,15 +506,20 @@ impl RegionFlushTask { flush_metrics = flush_metrics.merge(metrics); - file_metas.extend(ssts_written.into_iter().map(|sst_info| { + for sst_info in ssts_written { flushed_bytes += sst_info.file_size; - Self::new_file_meta( + let pk_range = sst_info + .file_metadata + .as_ref() + .and_then(|meta| extract_primary_key_range(meta, &version.metadata)); + file_metas.push(Self::new_file_meta( self.region_id, max_sequence, sst_info, partition_expr.clone(), - ) - })); + pk_range, + )); + } } common_telemetry::debug!( @@ -604,7 +611,12 @@ impl RegionFlushTask { max_sequence: u64, sst_info: SstInfo, partition_expr: Option, + primary_key_range: Option<(Bytes, Bytes)>, ) -> FileMeta { + let (primary_key_min, primary_key_max) = match primary_key_range { + Some((min, max)) => (Some(min), Some(max)), + None => (None, None), + }; FileMeta { region_id, file_id: sst_info.file_id, @@ -621,6 +633,8 @@ impl RegionFlushTask { sequence: NonZeroU64::new(max_sequence), partition_expr, num_series: sst_info.num_series, + primary_key_min, + primary_key_max, } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 64547f45aa..e50650ccee 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -257,6 +257,7 @@ async fn checkpoint_with_different_compression_types() { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], @@ -323,6 +324,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index c1240c3829..6e485e42ac 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -70,7 +70,7 @@ use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::{self, region_dir_from_table_dir}; -use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range}; use crate::sst::parquet::reader::MetadataCacheMetrics; use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; @@ -991,6 +991,7 @@ fn maybe_load_cache( /// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata /// directly from the local store. /// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...). +#[allow(clippy::too_many_arguments)] async fn preload_parquet_meta_cache_for_files( region_id: RegionId, cache_manager: CacheManagerRef, @@ -998,6 +999,7 @@ async fn preload_parquet_meta_cache_for_files( table_dir: String, path_type: PathType, object_store: object_store::ObjectStore, + region_metadata: RegionMetadataRef, mut files: Vec, ) -> usize { if !cache_manager.sst_meta_cache_enabled() @@ -1021,11 +1023,16 @@ async fn preload_parquet_meta_cache_for_files( let file_id = file_handle.file_id(); let mut cache_metrics = MetadataCacheMetrics::default(); - if cache_manager + if let Some(metadata) = cache_manager .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default()) .await - .is_some() { + if file_handle.primary_key_range().is_none() + && let Some(primary_key_range) = + extract_primary_key_range(&metadata, ®ion_metadata) + { + file_handle.set_primary_key_range(primary_key_range); + } // Metadata is either already in memory or loaded from file cache. if cache_metrics.mem_cache_hit == 0 { loaded += 1; @@ -1042,6 +1049,11 @@ async fn preload_parquet_meta_cache_for_files( let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); match loader.load(&mut cache_metrics).await { Ok(metadata) => { + if let Some(primary_key_range) = + extract_primary_key_range(&metadata, ®ion_metadata) + { + file_handle.set_primary_key_range(primary_key_range); + } cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None); loaded += 1; } @@ -1090,6 +1102,7 @@ fn maybe_preload_parquet_meta_cache( let table_dir = region.access_layer.table_dir().to_string(); let path_type = region.access_layer.path_type(); let object_store = region.access_layer.object_store().clone(); + let region_metadata = region.version_control.current().version.metadata.clone(); // Collect SST files. Do not hold the version longer than needed. let mut files = Vec::new(); @@ -1109,6 +1122,7 @@ fn maybe_preload_parquet_meta_cache( table_dir, path_type, object_store, + region_metadata, files, ) .await; @@ -1147,7 +1161,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; use common_time::Timestamp; - use datatypes::arrow::array::{ArrayRef, Int64Array}; + use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use object_store::services::{Fs, Memory}; @@ -1203,7 +1217,15 @@ mod tests { let file_id = FileId::random(); let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; - let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef; + let batch = RecordBatch::try_from_iter([ + ("col", col), + ( + store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME, + primary_key, + ), + ]) + .unwrap(); let parquet_bytes = sst_parquet_bytes(&batch); let file_size = parquet_bytes.len() as u64; @@ -1223,6 +1245,7 @@ mod tests { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); @@ -1258,7 +1281,8 @@ mod tests { table_dir.to_string(), path_type, source_store.clone(), - vec![file_handle], + Arc::new(sst_region_metadata()), + vec![file_handle.clone()], ) .await; @@ -1269,6 +1293,7 @@ mod tests { .get_parquet_meta_data_from_mem_cache(region_file_id) .is_some() ); + assert!(file_handle.primary_key_range().is_some()); } #[tokio::test] @@ -1299,6 +1324,7 @@ mod tests { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); @@ -1327,6 +1353,7 @@ mod tests { table_dir.to_string(), path_type, object_store, + Arc::new(sst_region_metadata()), vec![file_handle], ) .await; @@ -1372,6 +1399,7 @@ mod tests { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }; let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); @@ -1399,6 +1427,7 @@ mod tests { table_dir.to_string(), path_type, object_store, + Arc::new(sst_region_metadata()), vec![file_handle], ) .await; diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index 49676be8be..596e60c6dd 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -436,6 +436,7 @@ mod tests { sequence: NonZeroU64::new(1), partition_expr, num_series: 1, + ..Default::default() } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 74176831b6..213cd4f714 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -17,9 +17,11 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::num::NonZeroU64; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use base64::prelude::{BASE64_STANDARD, Engine}; +use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error}; use common_time::Timestamp; @@ -36,6 +38,33 @@ use crate::cache::file_cache::{FileType, IndexKey}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location; +/// Custom serde functions for Bytes fields serialized as base64 strings. +fn serialize_bytes_option(bytes: &Option, serializer: S) -> Result +where + S: serde::Serializer, +{ + match bytes { + None => serializer.serialize_none(), + Some(b) => serializer.serialize_some(&BASE64_STANDARD.encode(b)), + } +} + +fn deserialize_bytes_option<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let opt: Option = Option::deserialize(deserializer)?; + match opt { + None => Ok(None), + Some(s) => { + let decoded = BASE64_STANDARD + .decode(&s) + .map_err(serde::de::Error::custom)?; + Ok(Some(Bytes::from(decoded))) + } + } +} + /// Custom serde functions for partition_expr field in FileMeta fn serialize_partition_expr( partition_expr: &Option, @@ -233,6 +262,24 @@ pub struct FileMeta { /// /// The number is 0 if the series number is not available. pub num_series: u64, + /// Minimum primary key value in the file, encoded as bytes. + /// `None` if the primary key range is not available (e.g., legacy files). + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_bytes_option", + deserialize_with = "deserialize_bytes_option" + )] + pub primary_key_min: Option, + /// Maximum primary key value in the file, encoded as bytes. + /// `None` if the primary key range is not available (e.g., legacy files). + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_bytes_option", + deserialize_with = "deserialize_bytes_option" + )] + pub primary_key_max: Option, } impl Debug for FileMeta { @@ -273,8 +320,19 @@ impl Debug for FileMeta { } }) .field("partition_expr", &self.partition_expr) - .field("num_series", &self.num_series) - .finish() + .field("num_series", &self.num_series); + if self.primary_key_min.is_some() || self.primary_key_max.is_some() { + debug_struct + .field( + "primary_key_min", + &self.primary_key_min.as_ref().map(|b| b.len()), + ) + .field( + "primary_key_max", + &self.primary_key_max.as_ref().map(|b| b.len()), + ); + } + debug_struct.finish() } } @@ -311,6 +369,14 @@ pub struct ColumnIndexMetadata { } impl FileMeta { + /// Returns the primary key range if both min and max are present. + pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> { + match (&self.primary_key_min, &self.primary_key_max) { + (Some(min), Some(max)) => Some((min.clone(), max.clone())), + _ => None, + } + } + pub fn exists_index(&self) -> bool { !self.available_indexes.is_empty() } @@ -323,7 +389,7 @@ impl FileMeta { } } - /// Whether the index file is up-to-date comparing to another file meta. + /// Whether the index file is up-to-date comparing to another file meta. pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool { self.exists_index() && other.exists_index() && self.index_version >= other.index_version } @@ -417,8 +483,20 @@ impl fmt::Debug for FileHandle { impl FileHandle { pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle { + let pk_range = meta.primary_key_range(); FileHandle { - inner: Arc::new(FileHandleInner::new(meta, file_purger)), + inner: Arc::new(FileHandleInner::new(meta, file_purger, pk_range)), + } + } + + #[cfg(test)] + pub fn new_with_primary_key_range( + meta: FileMeta, + file_purger: FilePurgerRef, + primary_key_range: Option<(Bytes, Bytes)>, + ) -> FileHandle { + FileHandle { + inner: Arc::new(FileHandleInner::new(meta, file_purger, primary_key_range)), } } @@ -498,6 +576,14 @@ impl FileHandle { pub fn is_deleted(&self) -> bool { self.inner.deleted.load(Ordering::Relaxed) } + + pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> { + self.inner.primary_key_range.read().unwrap().clone() + } + + pub(crate) fn set_primary_key_range(&self, primary_key_range: (Bytes, Bytes)) { + *self.inner.primary_key_range.write().unwrap() = Some(primary_key_range); + } } /// Inner data of [FileHandle]. @@ -508,6 +594,7 @@ struct FileHandleInner { compacting: AtomicBool, deleted: AtomicBool, index_outdated: AtomicBool, + primary_key_range: RwLock>, file_purger: FilePurgerRef, } @@ -523,13 +610,18 @@ impl Drop for FileHandleInner { impl FileHandleInner { /// There should only be one `FileHandleInner` for each file on a datanode - fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner { + fn new( + meta: FileMeta, + file_purger: FilePurgerRef, + primary_key_range: Option<(Bytes, Bytes)>, + ) -> FileHandleInner { file_purger.new_file(&meta); FileHandleInner { meta, compacting: AtomicBool::new(false), deleted: AtomicBool::new(false), index_outdated: AtomicBool::new(false), + primary_key_range: RwLock::new(primary_key_range), file_purger, } } @@ -734,6 +826,7 @@ mod tests { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() } } @@ -786,6 +879,7 @@ mod tests { sequence: None, partition_expr: Some(partition_expr.clone()), num_series: 0, + ..Default::default() }; // Test serialization/deserialization diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 7c1598787e..af1c81b491 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -283,6 +283,7 @@ mod tests { sequence: None, partition_expr: None, num_series: 0, + ..Default::default() }, file_purger, ); @@ -357,6 +358,7 @@ mod tests { sequence: NonZeroU64::new(4096), partition_expr: None, num_series: 0, + ..Default::default() }, file_purger, ); diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 6490d2d1dc..2f067cf3c1 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -285,6 +285,7 @@ mod tests { sequence: NonZeroU64::new(4096), partition_expr: None, num_series: 0, + ..Default::default() }; file_ref_mgr.add_file(&file_meta); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 90395642b6..d8d1f91e3d 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -832,6 +832,7 @@ mod tests { None => None, }, num_series: 0, + ..Default::default() }, Arc::new(NoopFilePurger), ); @@ -1285,6 +1286,7 @@ mod tests { None => None, }, num_series: 0, + ..Default::default() }, Arc::new(NoopFilePurger), ) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index d13f318fb5..442ecab70c 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -22,7 +22,10 @@ use object_store::ObjectStore; use parquet::arrow::async_reader::MetadataFetch; use parquet::errors::{ParquetError, Result as ParquetResult}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::statistics::Statistics; use snafu::{IntoError as _, ResultExt}; +use store_api::metadata::RegionMetadata; +use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use crate::error::{self, Result}; use crate::sst::parquet::reader::MetadataCacheMetrics; @@ -115,6 +118,44 @@ fn unbox_external_error(e: ParquetError) -> StdResult Option<(Bytes, Bytes)> { + if region_metadata.primary_key.is_empty() { + return None; + } + + let pk_column_idx = parquet_meta + .file_metadata() + .schema_descr() + .columns() + .iter() + .position(|column| column.name() == PRIMARY_KEY_COLUMN_NAME)?; + + let mut min: Option = None; + let mut max: Option = None; + + for row_group in parquet_meta.row_groups() { + let Statistics::ByteArray(stats) = row_group.column(pk_column_idx).statistics()? else { + return None; + }; + + let row_group_min = Bytes::copy_from_slice(stats.min_bytes_opt()?); + let row_group_max = Bytes::copy_from_slice(stats.max_bytes_opt()?); + min = Some(match min { + Some(current) => current.min(row_group_min), + None => row_group_min, + }); + max = Some(match max { + Some(current) => current.max(row_group_max), + None => row_group_max, + }); + } + + min.zip(max) +} + struct ObjectStoreFetch<'a> { object_store: &'a ObjectStore, file_path: &'a str, @@ -139,3 +180,120 @@ impl MetadataFetch for ObjectStoreFetch<'_> { .boxed() } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::arrow::array::{ + ArrayRef, BinaryArray, DictionaryArray, Int64Array, UInt32Array, + }; + use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + use datatypes::arrow::record_batch::RecordBatch; + use parquet::arrow::ArrowWriter; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::metadata::{KeyValue, ParquetMetaData}; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; + + use super::*; + use crate::sst::parquet::PARQUET_METADATA_KEY; + use crate::test_util::sst_util::sst_region_metadata; + + fn build_test_metadata( + include_primary_key: bool, + primary_keys: &[&[u8]], + row_group_sizes: &[usize], + stats_enabled: EnabledStatistics, + ) -> ParquetMetaData { + let total_rows = row_group_sizes.iter().sum::(); + let mut fields = vec![Field::new("field", ArrowDataType::Int64, true)]; + let mut columns: Vec = + vec![Arc::new(Int64Array::from_iter_values(0..total_rows as i64))]; + if include_primary_key { + assert_eq!(total_rows, primary_keys.len()); + fields.push(Field::new( + "__primary_key", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ), + false, + )); + let values = Arc::new(BinaryArray::from_iter_values(primary_keys.iter().copied())); + let keys = UInt32Array::from_iter_values(0..primary_keys.len() as u32); + columns.push(Arc::new(DictionaryArray::new(keys, values))); + } + + let schema = Arc::new(Schema::new(fields)); + let region_metadata = Arc::new(sst_region_metadata()); + let key_value = KeyValue::new( + PARQUET_METADATA_KEY.to_string(), + region_metadata.to_json().unwrap(), + ); + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value])) + .set_statistics_enabled(stats_enabled) + .build(); + + let mut parquet_bytes = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), Some(props)).unwrap(); + let mut offset = 0; + for row_group_size in row_group_sizes { + let batch = RecordBatch::try_new( + schema.clone(), + columns + .iter() + .map(|column| column.slice(offset, *row_group_size)) + .collect(), + ) + .unwrap(); + writer.write(&batch).unwrap(); + offset += row_group_size; + } + writer.close().unwrap(); + + ParquetRecordBatchReaderBuilder::try_new(Bytes::from(parquet_bytes)) + .unwrap() + .metadata() + .as_ref() + .clone() + } + + #[test] + fn test_extract_primary_key_range_returns_none_when_column_absent() { + let metadata = build_test_metadata(false, &[], &[1], EnabledStatistics::Page); + let region_metadata = sst_region_metadata(); + + assert_eq!(None, extract_primary_key_range(&metadata, ®ion_metadata)); + } + + #[test] + fn test_extract_primary_key_range_folds_row_group_stats() { + let metadata = build_test_metadata( + true, + &[b"bbb", b"ccc", b"aaa", b"zzz"], + &[2, 2], + EnabledStatistics::Page, + ); + let region_metadata = sst_region_metadata(); + + assert_eq!( + Some((Bytes::from_static(b"aaa"), Bytes::from_static(b"zzz"))), + extract_primary_key_range(&metadata, ®ion_metadata) + ); + } + + #[test] + fn test_extract_primary_key_range_returns_none_when_any_rg_stats_missing() { + let metadata = build_test_metadata( + true, + &[b"bbb", b"ccc", b"aaa", b"zzz"], + &[2, 2], + EnabledStatistics::None, + ); + let region_metadata = sst_region_metadata(); + + assert_eq!(None, extract_primary_key_range(&metadata, ®ion_metadata)); + } +} diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index a9e71eb5d9..5958cf7513 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -78,7 +78,9 @@ impl SstVersion { *f = FileHandle::new(file.clone(), file_purger.clone()); } }) - .or_insert_with(|| FileHandle::new(file.clone(), file_purger.clone())); + .or_insert_with(|| { + FileHandle::new(file.clone(), file_purger.clone()) + }); } } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 84f15ad837..4e759f50cd 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -213,6 +213,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) num_series: 0, sequence: None, partition_expr: None, + ..Default::default() }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 7a407ece4f..b14eea0958 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -115,6 +115,7 @@ impl VersionControlBuilder { .expect("partition expression should be valid JSON"), None => None, }, + ..Default::default() }, ); self @@ -207,6 +208,7 @@ pub(crate) fn apply_edit( .expect("partition expression should be valid JSON"), None => None, }, + ..Default::default() } }) .collect();