From 020bdffd61365a140218643c49ba01c5043b2966 Mon Sep 17 00:00:00 2001 From: Eric Ridge Date: Wed, 15 Jan 2025 16:46:36 -0500 Subject: [PATCH] perf: push `FileSlice`s down through most of fast fields (#19) This PR modifies internal API signatures and implementation details so that `FileSlice`s are passed down into the innards of (at least) the `BlockwiseLinearCodec`. This allows tantivy to defer dereferencing large slices of bytes when reading numeric fast fields, and instead dereference only the slice of bytes it needs for any given compressed Block. The motivation here is for external `Directory` implementations where it's not exactly efficient to dereference large slices of bytes. --- .gitignore | 3 + columnar/src/column/serialize.rs | 38 +++++++--- columnar/src/column_index/merge/mod.rs | 6 +- .../src/column_index/multivalued_index.rs | 22 ++++-- .../src/column_index/optional_index/mod.rs | 20 +++-- .../src/column_index/optional_index/tests.rs | 6 +- columnar/src/column_index/serialize.rs | 16 ++-- columnar/src/column_values/mod.rs | 3 +- columnar/src/column_values/stats.rs | 37 +++++++++ .../u128_based/compact_space/mod.rs | 3 +- columnar/src/column_values/u128_based/mod.rs | 9 ++- .../src/column_values/u64_based/bitpacked.rs | 4 +- .../u64_based/blockwise_linear.rs | 76 +++++++++++++++---- .../src/column_values/u64_based/linear.rs | 4 +- columnar/src/column_values/u64_based/mod.rs | 36 +++++---- columnar/src/column_values/u64_based/tests.rs | 21 ++--- columnar/src/dynamic_column.rs | 36 +++++---- common/src/vint.rs | 28 ++++++- runtests.sh | 3 + src/collector/facet_collector.rs | 1 - src/collector/multi_collector.rs | 1 - src/indexer/doc_opstamp_mapping.rs | 1 - src/postings/recorder.rs | 1 - src/query/boolean_query/boolean_weight.rs | 2 +- src/query/set_query.rs | 2 +- src/tokenizer/facet_tokenizer.rs | 1 - src/tokenizer/ngram_tokenizer.rs | 1 - 27 files changed, 266 insertions(+), 115 deletions(-) create mode 100755 runtests.sh diff --git a/.gitignore b/.gitignore index 92b264316..3f25aab25 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ trace.dat cargo-timing* control variable + +# for `sample record -p` +profile.json diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 73fc5e7f5..58d2186f7 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -2,7 +2,7 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::OwnedBytes; +use common::file_slice::FileSlice; use sstable::Dictionary; use crate::column::{BytesColumn, Column}; @@ -41,12 +41,13 @@ pub fn serialize_column_mappable_to_u64( } pub fn open_column_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -61,12 +62,13 @@ pub fn open_column_u64( } pub fn open_column_u128( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -84,12 +86,13 @@ pub fn open_column_u128( /// /// See [`open_u128_as_compact_u64`] for more details. pub fn open_column_u128_as_compact_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -103,10 +106,21 @@ pub fn open_column_u128_as_compact_u64( }) } -pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result { - let (body, dictionary_len_bytes) = data.rsplit(4); - let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); +pub fn open_column_bytes( + file_slice: FileSlice, + format_version: Version, +) -> io::Result { + let (body, dictionary_len_bytes) = file_slice.split_from_end(4); + let dictionary_len = u32::from_le_bytes( + dictionary_len_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); + + let dictionary_bytes = dictionary_bytes.read_bytes()?; let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); let term_ord_column = crate::column::open_column_u64::(column_bytes, format_version)?; Ok(BytesColumn { @@ -115,7 +129,7 @@ pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Resul }) } -pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result { - let bytes_column = open_column_bytes(data, format_version)?; +pub fn open_column_str(file_slice: FileSlice, format_version: Version) -> io::Result { + let bytes_column = open_column_bytes(file_slice, format_version)?; Ok(StrColumn::wrap(bytes_column)) } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index ea09bdba1..53d2c259c 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -95,7 +95,7 @@ pub fn merge_column_index<'a>( #[cfg(test)] mod tests { - use common::OwnedBytes; + use common::file_slice::FileSlice; use crate::column_index::merge::detect_cardinality; use crate::column_index::multivalued_index::{ @@ -178,7 +178,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5]); } @@ -216,7 +216,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5, 6]); } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index cef5a1221..93cf15915 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -3,7 +3,8 @@ use std::io::Write; use std::ops::Range; use std::sync::Arc; -use common::{CountingWriter, OwnedBytes}; +use common::file_slice::FileSlice; +use common::CountingWriter; use super::optional_index::{open_optional_index, serialize_optional_index}; use super::{OptionalIndex, SerializableOptionalIndex, Set}; @@ -44,21 +45,26 @@ pub fn serialize_multivalued_index( } pub fn open_multivalued_index( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { match format_version { Version::V1 => { let start_index_column: Arc> = - load_u64_based_column_values(bytes)?; + load_u64_based_column_values(file_slice)?; Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 { start_index_column, })) } Version::V2 => { - let (body_bytes, optional_index_len) = bytes.rsplit(4); - let optional_index_len = - u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap()); + let (body_bytes, optional_index_len) = file_slice.split_from_end(4); + let optional_index_len = u32::from_le_bytes( + optional_index_len + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (optional_index_bytes, start_index_bytes) = body_bytes.split(optional_index_len as usize); let optional_index = open_optional_index(optional_index_bytes)?; @@ -185,8 +191,8 @@ impl MultiValueIndex { }; let mut buffer = Vec::new(); serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_multivalued_index(bytes, Version::V2).unwrap() + let file_slice = FileSlice::from(buffer); + open_multivalued_index(file_slice, Version::V2).unwrap() } pub fn get_start_index_column(&self) -> &Arc> { diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index f3ab523e2..3e6ee3a62 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; mod set; mod set_block; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes, VInt}; pub use set::{SelectCursor, Set, SetCodec}; use set_block::{ @@ -266,8 +267,8 @@ impl OptionalIndex { .unwrap_or(true)); let mut buffer = Vec::new(); serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_optional_index(bytes).unwrap() + let file_slice = FileSlice::from(buffer); + open_optional_index(file_slice).unwrap() } pub fn num_docs(&self) -> RowId { @@ -515,10 +516,17 @@ fn deserialize_optional_index_block_metadatas( (block_metas.into_boxed_slice(), non_null_rows_before_block) } -pub fn open_optional_index(bytes: OwnedBytes) -> io::Result { - let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2); - let num_non_empty_block_bytes = - u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap()); +pub fn open_optional_index(file_slice: FileSlice) -> io::Result { + let (bytes, num_non_empty_blocks_bytes) = file_slice.split_from_end(2); + let num_non_empty_block_bytes = u16::from_le_bytes( + num_non_empty_blocks_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); + + let mut bytes = bytes.read_bytes()?; let num_rows = VInt::deserialize_u64(&mut bytes)? as u32; let block_metas_num_bytes = num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES; diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index 2bcb77fd6..09a67d340 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -59,7 +59,7 @@ fn test_with_random_sets_simple() { let vals = 10..ELEMENTS_PER_BLOCK * 2; let mut out: Vec = Vec::new(); serialize_optional_index(&vals, 100, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let ranks: Vec = (65_472u32..65_473u32).collect(); let els: Vec = ranks.iter().copied().map(|rank| rank + 10).collect(); let mut select_cursor = null_index.select_cursor(); @@ -102,7 +102,7 @@ impl<'a> Iterable for &'a [bool] { fn test_null_index(data: &[bool]) { let mut out: Vec = Vec::new(); serialize_optional_index(&data, data.len() as RowId, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let orig_idx_with_value: Vec = data .iter() .enumerate() @@ -241,7 +241,7 @@ mod bench { .collect(); serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap(); - open_optional_index(OwnedBytes::new(out)).unwrap() + open_optional_index(FileSlice::from(out)).unwrap() } fn random_range_iterator( diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index 673c3459b..2688e441b 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -1,7 +1,8 @@ use std::io; use std::io::Write; -use common::{CountingWriter, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{CountingWriter, HasLen}; use super::multivalued_index::SerializableMultivalueIndex; use super::OptionalIndex; @@ -65,27 +66,28 @@ pub fn serialize_column_index( /// Open a serialized column index. pub fn open_column_index( - mut bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { - if bytes.is_empty() { + if file_slice.len() == 0 { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "Failed to deserialize column index. Empty buffer.", )); } - let cardinality_code = bytes[0]; + let (header, body) = file_slice.split(1); + let cardinality_code = header.read_bytes()?.as_slice()[0]; let cardinality = Cardinality::try_from_code(cardinality_code)?; - bytes.advance(1); + match cardinality { Cardinality::Full => Ok(ColumnIndex::Full), Cardinality::Optional => { - let optional_index = super::optional_index::open_optional_index(bytes)?; + let optional_index = super::optional_index::open_optional_index(body)?; Ok(ColumnIndex::Optional(optional_index)) } Cardinality::Multivalued => { let multivalue_index = - super::multivalued_index::open_multivalued_index(bytes, format_version)?; + super::multivalued_index::open_multivalued_index(body, format_version)?; Ok(ColumnIndex::Multivalued(multivalue_index)) } } diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index ef5de5154..59840ad05 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -31,8 +31,7 @@ pub use u128_based::{ CompactSpaceU64Accessor, }; pub use u64_based::{ - load_u64_based_column_values, serialize_and_load_u64_based_column_values, - serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, + load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, }; pub use vec_column::VecColumn; diff --git a/columnar/src/column_values/stats.rs b/columnar/src/column_values/stats.rs index 40f042525..794619350 100644 --- a/columnar/src/column_values/stats.rs +++ b/columnar/src/column_values/stats.rs @@ -27,6 +27,43 @@ impl ColumnStats { } } +impl ColumnStats { + /// Same as [`BinarySeerializable::deserialize`] but also returns the number of bytes + /// consumed from the reader `R` + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + + let (min_value, len) = VInt::deserialize_with_size(reader)?; + let min_value = min_value.0; + nbytes += len; + + let (gcd, len) = VInt::deserialize_with_size(reader)?; + let gcd = gcd.0; + let gcd = NonZeroU64::new(gcd) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD of 0 is forbidden"))?; + nbytes += len; + + let (amplitude, len) = VInt::deserialize_with_size(reader)?; + let amplitude = amplitude.0 * gcd.get(); + let max_value = min_value + amplitude; + nbytes += len; + + let (num_rows, len) = VInt::deserialize_with_size(reader)?; + let num_rows = num_rows.0 as RowId; + nbytes += len; + + Ok(( + ColumnStats { + min_value, + max_value, + num_rows, + gcd, + }, + nbytes, + )) + } +} + impl BinarySerializable for ColumnStats { fn serialize(&self, writer: &mut W) -> io::Result<()> { VInt(self.min_value).serialize(writer)?; diff --git a/columnar/src/column_values/u128_based/compact_space/mod.rs b/columnar/src/column_values/u128_based/compact_space/mod.rs index f246c7b0c..8570a37ed 100644 --- a/columnar/src/column_values/u128_based/compact_space/mod.rs +++ b/columnar/src/column_values/u128_based/compact_space/mod.rs @@ -767,7 +767,7 @@ mod tests { ]; let mut out = Vec::new(); serialize_column_values_u128(&&vals[..], &mut out).unwrap(); - let decomp = open_u128_mapped(OwnedBytes::new(out)).unwrap(); + let decomp = open_u128_mapped(FileSlice::from(out)).unwrap(); let complete_range = 0..vals.len() as u32; assert_eq!( @@ -821,6 +821,7 @@ mod tests { let _data = test_aux_vals(vals); } + use common::file_slice::FileSlice; use proptest::prelude::*; fn num_strategy() -> impl Strategy { diff --git a/columnar/src/column_values/u128_based/mod.rs b/columnar/src/column_values/u128_based/mod.rs index 30665630a..fc71ae6a0 100644 --- a/columnar/src/column_values/u128_based/mod.rs +++ b/columnar/src/column_values/u128_based/mod.rs @@ -5,7 +5,8 @@ use std::sync::Arc; mod compact_space; -use common::{BinarySerializable, OwnedBytes, VInt}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, VInt}; pub use compact_space::{ CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor, }; @@ -101,8 +102,9 @@ impl U128FastFieldCodecType { /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open_u128_mapped( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceDecompressor::open(bytes)?; @@ -120,7 +122,8 @@ pub fn open_u128_mapped( /// # Notice /// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and /// also handle the new codecs. -pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result>> { +pub fn open_u128_as_compact_u64(file_slice: FileSlice) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceU64Accessor::open(bytes)?; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 3ed999648..622b02421 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -2,6 +2,7 @@ use std::io::{self, Write}; use std::num::NonZeroU64; use std::ops::{Range, RangeInclusive}; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -137,7 +138,8 @@ impl ColumnCodec for BitpackedCodec { type Estimator = BitpackedCodecEstimator; /// Opens a fast field given a file. - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let num_bits = num_bits(&stats); let bit_unpacker = BitUnpacker::new(num_bits); diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index eb9191aa8..57b1291f2 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -1,8 +1,10 @@ use std::io::Write; -use std::sync::Arc; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, OnceLock}; use std::{io, iter}; -use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, CountingWriter, DeserializeFrom, HasLen, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -172,32 +174,74 @@ impl ColumnCodec for BlockwiseLinearCodec { type Estimator = BlockwiseLinearEstimator; - fn load(mut bytes: OwnedBytes) -> io::Result { - let stats = ColumnStats::deserialize(&mut bytes)?; - let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; - let footer_offset = bytes.len() - 4 - footer_len as usize; - let (data, mut footer) = bytes.split(footer_offset); + fn load(file_slice: FileSlice) -> io::Result { + // [`ColumnStats::deserialize_with_size`] deserializes 4 variable-width encoded u64s, which + // could end up being, in the worst case, 9 bytes each. this is where the 36 comes from + let (stats, _) = file_slice.clone().split(36.min(file_slice.len())); // hope that's enough bytes + let mut stats = stats.read_bytes()?; + let (stats, stats_nbytes) = ColumnStats::deserialize_with_size(&mut stats)?; + + let (_, body) = file_slice.split(stats_nbytes); + + let (_, footer) = body.clone().split_from_end(4); + + let footer_len: u32 = footer.read_bytes()?.as_slice().deserialize()?; + let (data, footer) = body.split_from_end(footer_len as usize + 4); + + let mut footer = footer.read_bytes()?; let num_blocks = compute_num_blocks(stats.num_rows); - let mut blocks: Vec = iter::repeat_with(|| Block::deserialize(&mut footer)) - .take(num_blocks as usize) - .collect::>()?; + let mut blocks: Vec = + iter::repeat_with(|| Block::deserialize(&mut footer)) + .take(num_blocks as usize) + .map(|block| { + block.map(|block| BlockWithLength { + block, + file_slice: FileSlice::from(Vec::new()), + data: OnceLock::default(), + }) + }) + .collect::>()?; + let mut start_offset = 0; for block in &mut blocks { + let len = (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; block.data_start_offset = start_offset; - start_offset += (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; + block.file_slice = data + .clone() + .slice(start_offset..(start_offset + len).min(data.len())); + start_offset += len; } + Ok(BlockwiseLinearReader { blocks: blocks.into_boxed_slice().into(), - data, stats, }) } } +struct BlockWithLength { + block: Block, + file_slice: FileSlice, + data: OnceLock, +} + +impl Deref for BlockWithLength { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.block + } +} + +impl DerefMut for BlockWithLength { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.block + } +} + #[derive(Clone)] pub struct BlockwiseLinearReader { - blocks: Arc<[Block]>, - data: OwnedBytes, + blocks: Arc<[BlockWithLength]>, stats: ColumnStats, } @@ -208,7 +252,9 @@ impl ColumnValues for BlockwiseLinearReader { let idx_within_block = idx % BLOCK_SIZE; let block = &self.blocks[block_id]; let interpoled_val: u64 = block.line.eval(idx_within_block); - let block_bytes = &self.data[block.data_start_offset..]; + let block_bytes = block + .data + .get_or_init(|| block.file_slice.read_bytes().unwrap()); let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); // TODO optimize me! the line parameters could be tweaked to include the multiplication and // remove the dependency. diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index ba0c9e641..b3d5caf8b 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -1,5 +1,6 @@ use std::io; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -190,7 +191,8 @@ impl ColumnCodec for LinearCodec { type Estimator = LinearCodecEstimator; - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let linear_params = LinearParams::deserialize(&mut data)?; Ok(LinearReader { diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 7afc71e3f..9ae82ee0c 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -8,7 +8,8 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::{BinarySerializable, OwnedBytes}; +use common::file_slice::FileSlice; +use common::BinarySerializable; use crate::column_values::monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, @@ -60,7 +61,7 @@ pub trait ColumnCodec { type Estimator: ColumnCodecEstimator + Default; /// Loads a column that has been serialized using this codec. - fn load(bytes: OwnedBytes) -> io::Result; + fn load(file_slice: FileSlice) -> io::Result; /// Returns an estimator. fn estimator() -> Self::Estimator { @@ -111,20 +112,22 @@ impl CodecType { fn load( &self, - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { match self { - CodecType::Bitpacked => load_specific_codec::(bytes), - CodecType::Linear => load_specific_codec::(bytes), - CodecType::BlockwiseLinear => load_specific_codec::(bytes), + CodecType::Bitpacked => load_specific_codec::(file_slice), + CodecType::Linear => load_specific_codec::(file_slice), + CodecType::BlockwiseLinear => { + load_specific_codec::(file_slice) + } } } } fn load_specific_codec( - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let reader = C::load(bytes)?; + let reader = C::load(file_slice)?; let reader_typed = monotonic_map_column( reader, StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::::new()), @@ -189,25 +192,28 @@ pub fn serialize_u64_based_column_values( /// /// This method first identifies the codec off the first byte. pub fn load_u64_based_column_values( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let codec_type: CodecType = bytes - .first() - .copied() + let (header, body) = file_slice.split(1); + let codec_type: CodecType = header + .read_bytes()? + .as_slice() + .get(0) + .cloned() .and_then(CodecType::try_from_code) .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to read codec type"))?; - bytes.advance(1); - codec_type.load(bytes) + codec_type.load(body) } /// Helper function to serialize a column (autodetect from all codecs) and then open it +#[cfg(test)] pub fn serialize_and_load_u64_based_column_values( vals: &dyn Iterable, codec_types: &[CodecType], ) -> Arc> { let mut buffer = Vec::new(); serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap(); - load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap() + load_u64_based_column_values::(FileSlice::from(buffer)).unwrap() } #[cfg(test)] diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 973ff6d90..983d76914 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -1,3 +1,4 @@ +use common::HasLen; use proptest::prelude::*; use proptest::{prop_oneof, proptest}; @@ -12,7 +13,7 @@ fn test_serialize_and_load_simple() { ) .unwrap(); assert_eq!(buffer.len(), 7); - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 3); assert_eq!(col.get_val(0), 1); assert_eq!(col.get_val(1), 2); @@ -29,7 +30,7 @@ fn test_empty_column_i64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), i64::MIN); assert_eq!(col.max_value(), i64::MIN); @@ -47,7 +48,7 @@ fn test_empty_column_u64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), u64::MIN); assert_eq!(col.max_value(), u64::MIN); @@ -65,7 +66,7 @@ fn test_empty_column_f64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); // FIXME. f64::MIN would be better! assert!(col.min_value().is_nan()); @@ -96,7 +97,7 @@ pub(crate) fn create_and_validate( let actual_compression = buffer.len() as u64; - let reader = TColumnCodec::load(OwnedBytes::new(buffer)).unwrap(); + let reader = TColumnCodec::load(FileSlice::from(buffer)).unwrap(); assert_eq!(reader.num_vals(), vals.len() as u32); let mut buffer = Vec::new(); for (doc, orig_val) in vals.iter().copied().enumerate() { @@ -325,7 +326,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), -4000i64); assert_eq!(column.get_val(1), -3000i64); @@ -342,7 +343,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) @@ -368,7 +369,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), 1000u64); assert_eq!(column.get_val(1), 2000u64); @@ -385,7 +386,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) } @@ -404,7 +405,7 @@ fn test_fastfield_gcd_u64() -> io::Result<()> { #[test] pub fn test_fastfield2() { - let test_fastfield = crate::column_values::serialize_and_load_u64_based_column_values::( + let test_fastfield = serialize_and_load_u64_based_column_values::( &&[100u64, 200u64, 300u64][..], &ALL_U64_CODEC_TYPES, ); diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 2b9d69770..9fbfc3436 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::{fmt, io}; use common::file_slice::FileSlice; -use common::{ByteCount, DateTime, HasLen, OwnedBytes}; +use common::{ByteCount, DateTime, HasLen}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; @@ -238,8 +238,7 @@ pub struct DynamicColumnHandle { impl DynamicColumnHandle { // TODO rename load pub fn open(&self) -> io::Result { - let column_bytes: OwnedBytes = self.file_slice.read_bytes()?; - self.open_internal(column_bytes) + self.open_internal(self.file_slice.clone()) } #[doc(hidden)] @@ -258,16 +257,15 @@ impl DynamicColumnHandle { /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. pub fn open_u64_lenient(&self) -> io::Result>> { - let column_bytes = self.file_slice.read_bytes()?; match self.column_type { ColumnType::Str | ColumnType::Bytes => { let column: BytesColumn = - crate::column::open_column_bytes(column_bytes, self.format_version)?; + crate::column::open_column_bytes(self.file_slice.clone(), self.format_version)?; Ok(Some(column.term_ord_column)) } ColumnType::IpAddr => { let column = crate::column::open_column_u128_as_compact_u64( - column_bytes, + self.file_slice.clone(), self.format_version, )?; Ok(Some(column)) @@ -277,40 +275,40 @@ impl DynamicColumnHandle { | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { - let column = - crate::column::open_column_u64::(column_bytes, self.format_version)?; + let column = crate::column::open_column_u64::( + self.file_slice.clone(), + self.format_version, + )?; Ok(Some(column)) } } } - fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { + fn open_internal(&self, file_slice: FileSlice) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { ColumnType::Bytes => { - crate::column::open_column_bytes(column_bytes, self.format_version)?.into() + crate::column::open_column_bytes(file_slice, self.format_version)?.into() } ColumnType::Str => { - crate::column::open_column_str(column_bytes, self.format_version)?.into() + crate::column::open_column_str(file_slice, self.format_version)?.into() } ColumnType::I64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::U64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::F64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::Bool => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::IpAddr => { - crate::column::open_column_u128::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u128::(file_slice, self.format_version)?.into() } ColumnType::DateTime => { - crate::column::open_column_u64::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } }; Ok(dynamic_column) diff --git a/common/src/vint.rs b/common/src/vint.rs index b09e73b92..9ffc27c16 100644 --- a/common/src/vint.rs +++ b/common/src/vint.rs @@ -56,6 +56,33 @@ impl BinarySerializable for VIntU128 { #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct VInt(pub u64); +impl VInt { + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + let mut bytes = reader.bytes(); + let mut result = 0u64; + let mut shift = 0u64; + loop { + match bytes.next() { + Some(Ok(b)) => { + nbytes += 1; + result |= u64::from(b % 128u8) << shift; + if b >= STOP_BIT { + return Ok((VInt(result), nbytes)); + } + shift += 7; + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Reach end of buffer while reading VInt", + )); + } + } + } + } +} + const STOP_BIT: u8 = 128; #[inline] @@ -221,7 +248,6 @@ impl BinarySerializable for VInt { #[cfg(test)] mod tests { - use super::{serialize_vint_u32, BinarySerializable, VInt}; fn aux_test_vint(val: u64) { diff --git a/runtests.sh b/runtests.sh new file mode 100755 index 000000000..c915be42f --- /dev/null +++ b/runtests.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 5123ed9c9..031243077 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -822,7 +822,6 @@ mod tests { #[cfg(all(test, feature = "unstable"))] mod bench { - use rand::seq::SliceRandom; use rand::thread_rng; use test::Bencher; diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 8077577d2..13f9bd589 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -263,7 +263,6 @@ impl SegmentCollector for MultiCollectorChild { #[cfg(test)] mod tests { - use super::*; use crate::collector::{Count, TopDocs}; use crate::query::TermQuery; diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 4d4e08583..65287401d 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -40,7 +40,6 @@ impl DocToOpstampMapping<'_> { #[cfg(test)] mod tests { - use super::DocToOpstampMapping; #[test] diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 58610c139..782b86fc0 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -273,7 +273,6 @@ impl Recorder for TfAndPositionRecorder { #[cfg(test)] mod tests { - use common::write_u32_vint; use super::{BufferLender, VInt32Reader}; diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index 7b617866f..288281969 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -315,7 +315,7 @@ impl Weight for BooleanWeight crate::Result<()> { - let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?; + let scorer = self.complex_scorer(reader, 1.0, DoNothingCombiner::default)?; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; match scorer { diff --git a/src/query/set_query.rs b/src/query/set_query.rs index a8412ab8e..650256bcd 100644 --- a/src/query/set_query.rs +++ b/src/query/set_query.rs @@ -63,7 +63,7 @@ impl TermSetQuery { Ok(BooleanWeight::new( sub_queries, false, - Box::new(|| DoNothingCombiner), + Box::new(DoNothingCombiner::default), )) } } diff --git a/src/tokenizer/facet_tokenizer.rs b/src/tokenizer/facet_tokenizer.rs index 089839367..2c80a6368 100644 --- a/src/tokenizer/facet_tokenizer.rs +++ b/src/tokenizer/facet_tokenizer.rs @@ -84,7 +84,6 @@ impl TokenStream for FacetTokenStream<'_> { #[cfg(test)] mod tests { - use super::FacetTokenizer; use crate::schema::Facet; use crate::tokenizer::{Token, TokenStream, Tokenizer}; diff --git a/src/tokenizer/ngram_tokenizer.rs b/src/tokenizer/ngram_tokenizer.rs index b975efc4f..83a2dd4e5 100644 --- a/src/tokenizer/ngram_tokenizer.rs +++ b/src/tokenizer/ngram_tokenizer.rs @@ -313,7 +313,6 @@ fn utf8_codepoint_width(b: u8) -> usize { #[cfg(test)] mod tests { - use super::{utf8_codepoint_width, CodepointFrontiers, NgramTokenizer, StutteringIterator}; use crate::tokenizer::tests::assert_token; use crate::tokenizer::{Token, TokenStream, Tokenizer};