From bb4c4b8522139affe489216a3663400388448572 Mon Sep 17 00:00:00 2001 From: Eric Ridge Date: Wed, 15 Jan 2025 16:46:36 -0500 Subject: [PATCH] perf: push `FileSlice`s down through most of fast fields (#19) This PR modifies internal API signatures and implementation details so that `FileSlice`s are passed down into the innards of (at least) the `BlockwiseLinearCodec`. This allows tantivy to defer dereferencing large slices of bytes when reading numeric fast fields, and instead dereference only the slice of bytes it needs for any given compressed Block. The motivation here is for external `Directory` implementations where it's not exactly efficient to dereference large slices of bytes. --- .gitignore | 3 + columnar/src/column/serialize.rs | 38 ++-- columnar/src/column_index/merge/mod.rs | 6 +- .../src/column_index/multivalued_index.rs | 22 ++- .../src/column_index/optional_index/mod.rs | 20 +- .../src/column_index/optional_index/tests.rs | 177 +++++++++++++++++- columnar/src/column_index/serialize.rs | 16 +- columnar/src/column_values/mod.rs | 3 +- columnar/src/column_values/stats.rs | 37 ++++ .../u128_based/compact_space/mod.rs | 3 +- columnar/src/column_values/u128_based/mod.rs | 9 +- .../src/column_values/u64_based/bitpacked.rs | 4 +- .../u64_based/blockwise_linear.rs | 76 ++++++-- .../src/column_values/u64_based/linear.rs | 4 +- columnar/src/column_values/u64_based/mod.rs | 36 ++-- columnar/src/column_values/u64_based/tests.rs | 21 ++- columnar/src/dynamic_column.rs | 36 ++-- common/src/vint.rs | 28 ++- runtests.sh | 3 + src/collector/facet_collector.rs | 1 - src/collector/multi_collector.rs | 1 - src/indexer/doc_opstamp_mapping.rs | 1 - src/postings/recorder.rs | 1 - src/query/boolean_query/boolean_weight.rs | 2 +- src/query/set_query.rs | 2 +- src/tokenizer/facet_tokenizer.rs | 1 - src/tokenizer/ngram_tokenizer.rs | 1 - 27 files changed, 433 insertions(+), 119 deletions(-) create mode 100755 runtests.sh diff --git a/.gitignore b/.gitignore index 92b264316..3f25aab25 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ trace.dat cargo-timing* control variable + +# for `sample record -p` +profile.json diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index e2127933b..d59119168 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -2,7 +2,7 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::OwnedBytes; +use common::file_slice::FileSlice; use sstable::Dictionary; use crate::column::{BytesColumn, Column}; @@ -41,12 +41,13 @@ pub fn serialize_column_mappable_to_u64( } pub fn open_column_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -61,12 +62,13 @@ pub fn open_column_u64( } pub fn open_column_u128( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -84,12 +86,13 @@ pub fn open_column_u128( /// /// See [`open_u128_as_compact_u64`] for more details. pub fn open_column_u128_as_compact_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -103,10 +106,21 @@ pub fn open_column_u128_as_compact_u64( }) } -pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result { - let (body, dictionary_len_bytes) = data.rsplit(4); - let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); +pub fn open_column_bytes( + file_slice: FileSlice, + format_version: Version, +) -> io::Result { + let (body, dictionary_len_bytes) = file_slice.split_from_end(4); + let dictionary_len = u32::from_le_bytes( + dictionary_len_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); + + let dictionary_bytes = dictionary_bytes.read_bytes()?; let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); let term_ord_column = crate::column::open_column_u64::(column_bytes, format_version)?; Ok(BytesColumn { @@ -115,7 +129,7 @@ pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Resul }) } -pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result { - let bytes_column = open_column_bytes(data, format_version)?; +pub fn open_column_str(file_slice: FileSlice, format_version: Version) -> io::Result { + let bytes_column = open_column_bytes(file_slice, format_version)?; Ok(StrColumn::wrap(bytes_column)) } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 816d45d74..1fd88d877 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -95,7 +95,7 @@ pub fn merge_column_index<'a>( #[cfg(test)] mod tests { - use common::OwnedBytes; + use common::file_slice::FileSlice; use crate::column_index::merge::detect_cardinality; use crate::column_index::multivalued_index::{ @@ -178,7 +178,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5]); } @@ -216,7 +216,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5, 6]); } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index 883475a1b..c26d21e0e 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -3,7 +3,8 @@ use std::io::Write; use std::ops::Range; use std::sync::Arc; -use common::{CountingWriter, OwnedBytes}; +use common::CountingWriter; +use common::file_slice::FileSlice; use super::optional_index::{open_optional_index, serialize_optional_index}; use super::{OptionalIndex, SerializableOptionalIndex, Set}; @@ -44,21 +45,26 @@ pub fn serialize_multivalued_index( } pub fn open_multivalued_index( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { match format_version { Version::V1 => { let start_index_column: Arc> = - load_u64_based_column_values(bytes)?; + load_u64_based_column_values(file_slice)?; Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 { start_index_column, })) } Version::V2 => { - let (body_bytes, optional_index_len) = bytes.rsplit(4); - let optional_index_len = - u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap()); + let (body_bytes, optional_index_len) = file_slice.split_from_end(4); + let optional_index_len = u32::from_le_bytes( + optional_index_len + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (optional_index_bytes, start_index_bytes) = body_bytes.split(optional_index_len as usize); let optional_index = open_optional_index(optional_index_bytes)?; @@ -185,8 +191,8 @@ impl MultiValueIndex { }; let mut buffer = Vec::new(); serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_multivalued_index(bytes, Version::V2).unwrap() + let file_slice = FileSlice::from(buffer); + open_multivalued_index(file_slice, Version::V2).unwrap() } pub fn get_start_index_column(&self) -> &Arc> { diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index e238dce9b..97a2b1f6b 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; mod set; mod set_block; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes, VInt}; pub use set::{SelectCursor, Set, SetCodec}; use set_block::{ @@ -268,8 +269,8 @@ impl OptionalIndex { ); let mut buffer = Vec::new(); serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_optional_index(bytes).unwrap() + let file_slice = FileSlice::from(buffer); + open_optional_index(file_slice).unwrap() } pub fn num_docs(&self) -> RowId { @@ -486,10 +487,17 @@ fn deserialize_optional_index_block_metadatas( (block_metas.into_boxed_slice(), non_null_rows_before_block) } -pub fn open_optional_index(bytes: OwnedBytes) -> io::Result { - let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2); - let num_non_empty_block_bytes = - u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap()); +pub fn open_optional_index(file_slice: FileSlice) -> io::Result { + let (bytes, num_non_empty_blocks_bytes) = file_slice.split_from_end(2); + let num_non_empty_block_bytes = u16::from_le_bytes( + num_non_empty_blocks_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); + + let mut bytes = bytes.read_bytes()?; let num_docs = VInt::deserialize_u64(&mut bytes)? as u32; let block_metas_num_bytes = num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES; diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index 5f54f3362..e27537239 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -59,7 +59,7 @@ fn test_with_random_sets_simple() { let vals = 10..ELEMENTS_PER_BLOCK * 2; let mut out: Vec = Vec::new(); serialize_optional_index(&vals, 100, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let ranks: Vec = (65_472u32..65_473u32).collect(); let els: Vec = ranks.iter().copied().map(|rank| rank + 10).collect(); let mut select_cursor = null_index.select_cursor(); @@ -102,7 +102,7 @@ impl<'a> Iterable for &'a [bool] { fn test_null_index(data: &[bool]) { let mut out: Vec = Vec::new(); serialize_optional_index(&data, data.len() as RowId, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let orig_idx_with_value: Vec = data .iter() .enumerate() @@ -164,11 +164,7 @@ fn test_optional_index_large() { fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) { let optional_index = OptionalIndex::for_test(num_rows, row_ids); assert_eq!(optional_index.num_docs(), num_rows); - assert!( - optional_index - .iter_non_null_docs() - .eq(row_ids.iter().copied()) - ); + assert!(optional_index.iter_rows().eq(row_ids.iter().copied())); } #[test] @@ -223,3 +219,170 @@ fn test_optional_index_for_tests() { assert!(!optional_index.contains(3)); assert_eq!(optional_index.num_docs(), 4); } + +#[cfg(all(test, feature = "unstable"))] +mod bench { + + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; + use test::Bencher; + + use super::*; + + const TOTAL_NUM_VALUES: u32 = 1_000_000; + fn gen_bools(fill_ratio: f64) -> OptionalIndex { + let mut out = Vec::new(); + let mut rng: StdRng = StdRng::from_seed([1u8; 32]); + let vals: Vec = (0..TOTAL_NUM_VALUES) + .map(|_| rng.gen_bool(fill_ratio)) + .enumerate() + .filter(|(_pos, val)| *val) + .map(|(pos, _)| pos as RowId) + .collect(); + serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap(); + + open_optional_index(FileSlice::from(out)).unwrap() + } + + fn random_range_iterator( + start: u32, + end: u32, + avg_step_size: u32, + avg_deviation: u32, + ) -> impl Iterator { + let mut rng: StdRng = StdRng::from_seed([1u8; 32]); + let mut current = start; + std::iter::from_fn(move || { + current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation); + if current >= end { None } else { Some(current) } + }) + } + + fn n_percent_step_iterator(percent: f32, num_values: u32) -> impl Iterator { + let ratio = percent / 100.0; + let step_size = (1f32 / ratio) as u32; + let deviation = step_size - 1; + random_range_iterator(0, num_values, step_size, deviation) + } + + fn walk_over_data(codec: &OptionalIndex, avg_step_size: u32) -> Option { + walk_over_data_from_positions( + codec, + random_range_iterator(0, TOTAL_NUM_VALUES, avg_step_size, 0), + ) + } + + fn walk_over_data_from_positions( + codec: &OptionalIndex, + positions: impl Iterator, + ) -> Option { + let mut dense_idx: Option = None; + for idx in positions { + dense_idx = dense_idx.or(codec.rank_if_exists(idx)); + } + dense_idx + } + + #[bench] + fn bench_translate_orig_to_codec_1percent_filled_10percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.01f64); + bench.iter(|| walk_over_data(&codec, 100)); + } + + #[bench] + fn bench_translate_orig_to_codec_5percent_filled_10percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.05f64); + bench.iter(|| walk_over_data(&codec, 100)); + } + + #[bench] + fn bench_translate_orig_to_codec_5percent_filled_1percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.05f64); + bench.iter(|| walk_over_data(&codec, 1000)); + } + + #[bench] + fn bench_translate_orig_to_codec_full_scan_1percent_filled(bench: &mut Bencher) { + let codec = gen_bools(0.01f64); + bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES)); + } + + #[bench] + fn bench_translate_orig_to_codec_full_scan_10percent_filled(bench: &mut Bencher) { + let codec = gen_bools(0.1f64); + bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES)); + } + + #[bench] + fn bench_translate_orig_to_codec_full_scan_90percent_filled(bench: &mut Bencher) { + let codec = gen_bools(0.9f64); + bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES)); + } + + #[bench] + fn bench_translate_orig_to_codec_10percent_filled_1percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.1f64); + bench.iter(|| walk_over_data(&codec, 100)); + } + + #[bench] + fn bench_translate_orig_to_codec_50percent_filled_1percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.5f64); + bench.iter(|| walk_over_data(&codec, 100)); + } + + #[bench] + fn bench_translate_orig_to_codec_90percent_filled_1percent_hit(bench: &mut Bencher) { + let codec = gen_bools(0.9f64); + bench.iter(|| walk_over_data(&codec, 100)); + } + + #[bench] + fn bench_translate_codec_to_orig_1percent_filled_0comma005percent_hit(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.01f64, 0.005f32, bench); + } + + #[bench] + fn bench_translate_codec_to_orig_10percent_filled_0comma005percent_hit(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.1f64, 0.005f32, bench); + } + + #[bench] + fn bench_translate_codec_to_orig_1percent_filled_10percent_hit(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.01f64, 10f32, bench); + } + + #[bench] + fn bench_translate_codec_to_orig_1percent_filled_full_scan(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.01f64, 100f32, bench); + } + + fn bench_translate_codec_to_orig_util( + percent_filled: f64, + percent_hit: f32, + bench: &mut Bencher, + ) { + let codec = gen_bools(percent_filled); + let num_non_nulls = codec.num_non_nulls(); + let idxs: Vec = if percent_hit == 100.0f32 { + (0..num_non_nulls).collect() + } else { + n_percent_step_iterator(percent_hit, num_non_nulls).collect() + }; + let mut output = vec![0u32; idxs.len()]; + bench.iter(|| { + output.copy_from_slice(&idxs[..]); + codec.select_batch(&mut output); + }); + } + + #[bench] + fn bench_translate_codec_to_orig_90percent_filled_0comma005percent_hit(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.9f64, 0.005, bench); + } + + #[bench] + fn bench_translate_codec_to_orig_90percent_filled_full_scan(bench: &mut Bencher) { + bench_translate_codec_to_orig_util(0.9f64, 100.0f32, bench); + } +} diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index aa3001a2c..db4f19d62 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -1,7 +1,8 @@ use std::io; use std::io::Write; -use common::{CountingWriter, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{CountingWriter, HasLen}; use super::OptionalIndex; use super::multivalued_index::SerializableMultivalueIndex; @@ -65,27 +66,28 @@ pub fn serialize_column_index( /// Open a serialized column index. pub fn open_column_index( - mut bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { - if bytes.is_empty() { + if file_slice.len() == 0 { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "Failed to deserialize column index. Empty buffer.", )); } - let cardinality_code = bytes[0]; + let (header, body) = file_slice.split(1); + let cardinality_code = header.read_bytes()?.as_slice()[0]; let cardinality = Cardinality::try_from_code(cardinality_code)?; - bytes.advance(1); + match cardinality { Cardinality::Full => Ok(ColumnIndex::Full), Cardinality::Optional => { - let optional_index = super::optional_index::open_optional_index(bytes)?; + let optional_index = super::optional_index::open_optional_index(body)?; Ok(ColumnIndex::Optional(optional_index)) } Cardinality::Multivalued => { let multivalue_index = - super::multivalued_index::open_multivalued_index(bytes, format_version)?; + super::multivalued_index::open_multivalued_index(body, format_version)?; Ok(ColumnIndex::Multivalued(multivalue_index)) } } diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index f26bf6d33..69af909cf 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -27,8 +27,7 @@ mod monotonic_column; pub(crate) use merge::MergedColumnValues; pub use stats::ColumnStats; pub use u64_based::{ - ALL_U64_CODEC_TYPES, CodecType, load_u64_based_column_values, - serialize_and_load_u64_based_column_values, serialize_u64_based_column_values, + ALL_U64_CODEC_TYPES, CodecType, load_u64_based_column_values, serialize_u64_based_column_values, }; pub use u128_based::{ CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped, diff --git a/columnar/src/column_values/stats.rs b/columnar/src/column_values/stats.rs index 40f042525..794619350 100644 --- a/columnar/src/column_values/stats.rs +++ b/columnar/src/column_values/stats.rs @@ -27,6 +27,43 @@ impl ColumnStats { } } +impl ColumnStats { + /// Same as [`BinarySeerializable::deserialize`] but also returns the number of bytes + /// consumed from the reader `R` + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + + let (min_value, len) = VInt::deserialize_with_size(reader)?; + let min_value = min_value.0; + nbytes += len; + + let (gcd, len) = VInt::deserialize_with_size(reader)?; + let gcd = gcd.0; + let gcd = NonZeroU64::new(gcd) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD of 0 is forbidden"))?; + nbytes += len; + + let (amplitude, len) = VInt::deserialize_with_size(reader)?; + let amplitude = amplitude.0 * gcd.get(); + let max_value = min_value + amplitude; + nbytes += len; + + let (num_rows, len) = VInt::deserialize_with_size(reader)?; + let num_rows = num_rows.0 as RowId; + nbytes += len; + + Ok(( + ColumnStats { + min_value, + max_value, + num_rows, + gcd, + }, + nbytes, + )) + } +} + impl BinarySerializable for ColumnStats { fn serialize(&self, writer: &mut W) -> io::Result<()> { VInt(self.min_value).serialize(writer)?; diff --git a/columnar/src/column_values/u128_based/compact_space/mod.rs b/columnar/src/column_values/u128_based/compact_space/mod.rs index 2c815bdce..5d928bc76 100644 --- a/columnar/src/column_values/u128_based/compact_space/mod.rs +++ b/columnar/src/column_values/u128_based/compact_space/mod.rs @@ -769,7 +769,7 @@ mod tests { ]; let mut out = Vec::new(); serialize_column_values_u128(&&vals[..], &mut out).unwrap(); - let decomp = open_u128_mapped(OwnedBytes::new(out)).unwrap(); + let decomp = open_u128_mapped(FileSlice::from(out)).unwrap(); let complete_range = 0..vals.len() as u32; assert_eq!( @@ -823,6 +823,7 @@ mod tests { let _data = test_aux_vals(vals); } + use common::file_slice::FileSlice; use proptest::prelude::*; fn num_strategy() -> impl Strategy { diff --git a/columnar/src/column_values/u128_based/mod.rs b/columnar/src/column_values/u128_based/mod.rs index 62e9a1f92..67074cb6d 100644 --- a/columnar/src/column_values/u128_based/mod.rs +++ b/columnar/src/column_values/u128_based/mod.rs @@ -5,7 +5,8 @@ use std::sync::Arc; mod compact_space; -use common::{BinarySerializable, OwnedBytes, VInt}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, VInt}; pub use compact_space::{ CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor, }; @@ -101,8 +102,9 @@ impl U128FastFieldCodecType { /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open_u128_mapped( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceDecompressor::open(bytes)?; @@ -120,7 +122,8 @@ pub fn open_u128_mapped( /// # Notice /// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and /// also handle the new codecs. -pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result>> { +pub fn open_u128_as_compact_u64(file_slice: FileSlice) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceU64Accessor::open(bytes)?; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index fde012937..7e49582ed 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -2,6 +2,7 @@ use std::io::{self, Write}; use std::num::NonZeroU64; use std::ops::{Range, RangeInclusive}; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits}; @@ -133,7 +134,8 @@ impl ColumnCodec for BitpackedCodec { type Estimator = BitpackedCodecEstimator; /// Opens a fast field given a file. - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let num_bits = num_bits(&stats); let bit_unpacker = BitUnpacker::new(num_bits); diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index e37f9098c..811b68ff0 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -1,8 +1,10 @@ use std::io::Write; -use std::sync::Arc; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, OnceLock}; use std::{io, iter}; -use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, CountingWriter, DeserializeFrom, HasLen, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits}; @@ -172,32 +174,74 @@ impl ColumnCodec for BlockwiseLinearCodec { type Estimator = BlockwiseLinearEstimator; - fn load(mut bytes: OwnedBytes) -> io::Result { - let stats = ColumnStats::deserialize(&mut bytes)?; - let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; - let footer_offset = bytes.len() - 4 - footer_len as usize; - let (data, mut footer) = bytes.split(footer_offset); + fn load(file_slice: FileSlice) -> io::Result { + // [`ColumnStats::deserialize_with_size`] deserializes 4 variable-width encoded u64s, which + // could end up being, in the worst case, 9 bytes each. this is where the 36 comes from + let (stats, _) = file_slice.clone().split(36.min(file_slice.len())); // hope that's enough bytes + let mut stats = stats.read_bytes()?; + let (stats, stats_nbytes) = ColumnStats::deserialize_with_size(&mut stats)?; + + let (_, body) = file_slice.split(stats_nbytes); + + let (_, footer) = body.clone().split_from_end(4); + + let footer_len: u32 = footer.read_bytes()?.as_slice().deserialize()?; + let (data, footer) = body.split_from_end(footer_len as usize + 4); + + let mut footer = footer.read_bytes()?; let num_blocks = compute_num_blocks(stats.num_rows); - let mut blocks: Vec = iter::repeat_with(|| Block::deserialize(&mut footer)) - .take(num_blocks as usize) - .collect::>()?; + let mut blocks: Vec = + iter::repeat_with(|| Block::deserialize(&mut footer)) + .take(num_blocks as usize) + .map(|block| { + block.map(|block| BlockWithLength { + block, + file_slice: FileSlice::from(Vec::new()), + data: OnceLock::default(), + }) + }) + .collect::>()?; + let mut start_offset = 0; for block in &mut blocks { + let len = (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; block.data_start_offset = start_offset; - start_offset += (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; + block.file_slice = data + .clone() + .slice(start_offset..(start_offset + len).min(data.len())); + start_offset += len; } + Ok(BlockwiseLinearReader { blocks: blocks.into_boxed_slice().into(), - data, stats, }) } } +struct BlockWithLength { + block: Block, + file_slice: FileSlice, + data: OnceLock, +} + +impl Deref for BlockWithLength { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.block + } +} + +impl DerefMut for BlockWithLength { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.block + } +} + #[derive(Clone)] pub struct BlockwiseLinearReader { - blocks: Arc<[Block]>, - data: OwnedBytes, + blocks: Arc<[BlockWithLength]>, stats: ColumnStats, } @@ -208,7 +252,9 @@ impl ColumnValues for BlockwiseLinearReader { let idx_within_block = idx % BLOCK_SIZE; let block = &self.blocks[block_id]; let interpoled_val: u64 = block.line.eval(idx_within_block); - let block_bytes = &self.data[block.data_start_offset..]; + let block_bytes = block + .data + .get_or_init(|| block.file_slice.read_bytes().unwrap()); let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); // TODO optimize me! the line parameters could be tweaked to include the multiplication and // remove the dependency. diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index dbfa13a4c..cac5f8584 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -1,5 +1,6 @@ use std::io; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits}; @@ -190,7 +191,8 @@ impl ColumnCodec for LinearCodec { type Estimator = LinearCodecEstimator; - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let linear_params = LinearParams::deserialize(&mut data)?; Ok(LinearReader { diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index aa2d9818b..0801c7baf 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -8,7 +8,8 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::{BinarySerializable, OwnedBytes}; +use common::BinarySerializable; +use common::file_slice::FileSlice; use crate::column_values::monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, @@ -60,7 +61,7 @@ pub trait ColumnCodec { type Estimator: ColumnCodecEstimator + Default; /// Loads a column that has been serialized using this codec. - fn load(bytes: OwnedBytes) -> io::Result; + fn load(file_slice: FileSlice) -> io::Result; /// Returns an estimator. fn estimator() -> Self::Estimator { @@ -111,20 +112,22 @@ impl CodecType { fn load( &self, - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { match self { - CodecType::Bitpacked => load_specific_codec::(bytes), - CodecType::Linear => load_specific_codec::(bytes), - CodecType::BlockwiseLinear => load_specific_codec::(bytes), + CodecType::Bitpacked => load_specific_codec::(file_slice), + CodecType::Linear => load_specific_codec::(file_slice), + CodecType::BlockwiseLinear => { + load_specific_codec::(file_slice) + } } } } fn load_specific_codec( - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let reader = C::load(bytes)?; + let reader = C::load(file_slice)?; let reader_typed = monotonic_map_column( reader, StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::::new()), @@ -189,25 +192,28 @@ pub fn serialize_u64_based_column_values( /// /// This method first identifies the codec off the first byte. pub fn load_u64_based_column_values( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let codec_type: CodecType = bytes - .first() - .copied() + let (header, body) = file_slice.split(1); + let codec_type: CodecType = header + .read_bytes()? + .as_slice() + .get(0) + .cloned() .and_then(CodecType::try_from_code) .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to read codec type"))?; - bytes.advance(1); - codec_type.load(bytes) + codec_type.load(body) } /// Helper function to serialize a column (autodetect from all codecs) and then open it +#[cfg(test)] pub fn serialize_and_load_u64_based_column_values( vals: &dyn Iterable, codec_types: &[CodecType], ) -> Arc> { let mut buffer = Vec::new(); serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap(); - load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap() + load_u64_based_column_values::(FileSlice::from(buffer)).unwrap() } #[cfg(test)] diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 6b2697263..5f707ecfd 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -1,3 +1,4 @@ +use common::HasLen; use proptest::prelude::*; use proptest::{prop_oneof, proptest}; use rand::Rng; @@ -13,7 +14,7 @@ fn test_serialize_and_load_simple() { ) .unwrap(); assert_eq!(buffer.len(), 7); - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 3); assert_eq!(col.get_val(0), 1); assert_eq!(col.get_val(1), 2); @@ -30,7 +31,7 @@ fn test_empty_column_i64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), i64::MIN); assert_eq!(col.max_value(), i64::MIN); @@ -48,7 +49,7 @@ fn test_empty_column_u64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), u64::MIN); assert_eq!(col.max_value(), u64::MIN); @@ -66,7 +67,7 @@ fn test_empty_column_f64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); // FIXME. f64::MIN would be better! assert!(col.min_value().is_nan()); @@ -97,7 +98,7 @@ pub(crate) fn create_and_validate( let actual_compression = buffer.len() as u64; - let reader = TColumnCodec::load(OwnedBytes::new(buffer)).unwrap(); + let reader = TColumnCodec::load(FileSlice::from(buffer)).unwrap(); assert_eq!(reader.num_vals(), vals.len() as u32); let mut buffer = Vec::new(); for (doc, orig_val) in vals.iter().copied().enumerate() { @@ -326,7 +327,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), -4000i64); assert_eq!(column.get_val(1), -3000i64); @@ -343,7 +344,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) @@ -369,7 +370,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), 1000u64); assert_eq!(column.get_val(1), 2000u64); @@ -386,7 +387,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) } @@ -405,7 +406,7 @@ fn test_fastfield_gcd_u64() -> io::Result<()> { #[test] pub fn test_fastfield2() { - let test_fastfield = crate::column_values::serialize_and_load_u64_based_column_values::( + let test_fastfield = serialize_and_load_u64_based_column_values::( &&[100u64, 200u64, 300u64][..], &ALL_U64_CODEC_TYPES, ); diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index f87779608..d54dfccb1 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::{fmt, io}; use common::file_slice::FileSlice; -use common::{ByteCount, DateTime, OwnedBytes}; +use common::{ByteCount, DateTime}; use serde::{Deserialize, Serialize}; use crate::column::{BytesColumn, Column, StrColumn}; @@ -239,8 +239,7 @@ pub struct DynamicColumnHandle { impl DynamicColumnHandle { // TODO rename load pub fn open(&self) -> io::Result { - let column_bytes: OwnedBytes = self.file_slice.read_bytes()?; - self.open_internal(column_bytes) + self.open_internal(self.file_slice.clone()) } #[doc(hidden)] @@ -259,16 +258,15 @@ impl DynamicColumnHandle { /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. pub fn open_u64_lenient(&self) -> io::Result>> { - let column_bytes = self.file_slice.read_bytes()?; match self.column_type { ColumnType::Str | ColumnType::Bytes => { let column: BytesColumn = - crate::column::open_column_bytes(column_bytes, self.format_version)?; + crate::column::open_column_bytes(self.file_slice.clone(), self.format_version)?; Ok(Some(column.term_ord_column)) } ColumnType::IpAddr => { let column = crate::column::open_column_u128_as_compact_u64( - column_bytes, + self.file_slice.clone(), self.format_version, )?; Ok(Some(column)) @@ -278,40 +276,40 @@ impl DynamicColumnHandle { | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { - let column = - crate::column::open_column_u64::(column_bytes, self.format_version)?; + let column = crate::column::open_column_u64::( + self.file_slice.clone(), + self.format_version, + )?; Ok(Some(column)) } } } - fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { + fn open_internal(&self, file_slice: FileSlice) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { ColumnType::Bytes => { - crate::column::open_column_bytes(column_bytes, self.format_version)?.into() + crate::column::open_column_bytes(file_slice, self.format_version)?.into() } ColumnType::Str => { - crate::column::open_column_str(column_bytes, self.format_version)?.into() + crate::column::open_column_str(file_slice, self.format_version)?.into() } ColumnType::I64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::U64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::F64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::Bool => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::IpAddr => { - crate::column::open_column_u128::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u128::(file_slice, self.format_version)?.into() } ColumnType::DateTime => { - crate::column::open_column_u64::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } }; Ok(dynamic_column) diff --git a/common/src/vint.rs b/common/src/vint.rs index 22eaa267c..7e7d38513 100644 --- a/common/src/vint.rs +++ b/common/src/vint.rs @@ -58,6 +58,33 @@ impl BinarySerializable for VIntU128 { #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct VInt(pub u64); +impl VInt { + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + let mut bytes = reader.bytes(); + let mut result = 0u64; + let mut shift = 0u64; + loop { + match bytes.next() { + Some(Ok(b)) => { + nbytes += 1; + result |= u64::from(b % 128u8) << shift; + if b >= STOP_BIT { + return Ok((VInt(result), nbytes)); + } + shift += 7; + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Reach end of buffer while reading VInt", + )); + } + } + } + } +} + const STOP_BIT: u8 = 128; #[inline] @@ -225,7 +252,6 @@ impl BinarySerializable for VInt { #[cfg(test)] mod tests { - use super::{BinarySerializable, VInt, serialize_vint_u32}; fn aux_test_vint(val: u64) { diff --git a/runtests.sh b/runtests.sh new file mode 100755 index 000000000..c915be42f --- /dev/null +++ b/runtests.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index a94ec03e8..9416186b4 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -821,7 +821,6 @@ mod tests { #[cfg(all(test, feature = "unstable"))] mod bench { - use rand::seq::SliceRandom; use rand::thread_rng; use test::Bencher; diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 14779c4a4..dc4697096 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -281,7 +281,6 @@ impl SegmentCollector for MultiCollectorChild { #[cfg(test)] mod tests { - use super::*; use crate::collector::{Count, TopDocs}; use crate::query::TermQuery; diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 4d4e08583..65287401d 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -40,7 +40,6 @@ impl DocToOpstampMapping<'_> { #[cfg(test)] mod tests { - use super::DocToOpstampMapping; #[test] diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 58610c139..782b86fc0 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -273,7 +273,6 @@ impl Recorder for TfAndPositionRecorder { #[cfg(test)] mod tests { - use common::write_u32_vint; use super::{BufferLender, VInt32Reader}; diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index 9e8cedf2e..be7ce1b93 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -405,7 +405,7 @@ impl Weight for BooleanWeight crate::Result<()> { - let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?; + let scorer = self.complex_scorer(reader, 1.0, DoNothingCombiner::default)?; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; match scorer { diff --git a/src/query/set_query.rs b/src/query/set_query.rs index 12e1b2818..93d497028 100644 --- a/src/query/set_query.rs +++ b/src/query/set_query.rs @@ -63,7 +63,7 @@ impl TermSetQuery { Ok(BooleanWeight::new( sub_queries, false, - Box::new(|| DoNothingCombiner), + Box::new(DoNothingCombiner::default), )) } } diff --git a/src/tokenizer/facet_tokenizer.rs b/src/tokenizer/facet_tokenizer.rs index 089839367..2c80a6368 100644 --- a/src/tokenizer/facet_tokenizer.rs +++ b/src/tokenizer/facet_tokenizer.rs @@ -84,7 +84,6 @@ impl TokenStream for FacetTokenStream<'_> { #[cfg(test)] mod tests { - use super::FacetTokenizer; use crate::schema::Facet; use crate::tokenizer::{Token, TokenStream, Tokenizer}; diff --git a/src/tokenizer/ngram_tokenizer.rs b/src/tokenizer/ngram_tokenizer.rs index 9b4584814..e1a6e2746 100644 --- a/src/tokenizer/ngram_tokenizer.rs +++ b/src/tokenizer/ngram_tokenizer.rs @@ -312,7 +312,6 @@ fn utf8_codepoint_width(b: u8) -> usize { #[cfg(test)] mod tests { - use super::{utf8_codepoint_width, CodepointFrontiers, NgramTokenizer, StutteringIterator}; use crate::tokenizer::tests::assert_token; use crate::tokenizer::{Token, TokenStream, Tokenizer};