diff --git a/.gitignore b/.gitignore index 92b264316..3f25aab25 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ trace.dat cargo-timing* control variable + +# for `sample record -p` +profile.json diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 73fc5e7f5..58d2186f7 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -2,7 +2,7 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::OwnedBytes; +use common::file_slice::FileSlice; use sstable::Dictionary; use crate::column::{BytesColumn, Column}; @@ -41,12 +41,13 @@ pub fn serialize_column_mappable_to_u64( } pub fn open_column_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -61,12 +62,13 @@ pub fn open_column_u64( } pub fn open_column_u128( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -84,12 +86,13 @@ pub fn open_column_u128( /// /// See [`open_u128_as_compact_u64`] for more details. pub fn open_column_u128_as_compact_u64( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result> { - let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload + .read_bytes()? .as_slice() .try_into() .unwrap(), @@ -103,10 +106,21 @@ pub fn open_column_u128_as_compact_u64( }) } -pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result { - let (body, dictionary_len_bytes) = data.rsplit(4); - let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); +pub fn open_column_bytes( + file_slice: FileSlice, + format_version: Version, +) -> io::Result { + let (body, dictionary_len_bytes) = file_slice.split_from_end(4); + let dictionary_len = u32::from_le_bytes( + dictionary_len_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); + + let dictionary_bytes = dictionary_bytes.read_bytes()?; let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); let term_ord_column = crate::column::open_column_u64::(column_bytes, format_version)?; Ok(BytesColumn { @@ -115,7 +129,7 @@ pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Resul }) } -pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result { - let bytes_column = open_column_bytes(data, format_version)?; +pub fn open_column_str(file_slice: FileSlice, format_version: Version) -> io::Result { + let bytes_column = open_column_bytes(file_slice, format_version)?; Ok(StrColumn::wrap(bytes_column)) } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index ea09bdba1..53d2c259c 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -95,7 +95,7 @@ pub fn merge_column_index<'a>( #[cfg(test)] mod tests { - use common::OwnedBytes; + use common::file_slice::FileSlice; use crate::column_index::merge::detect_cardinality; use crate::column_index::multivalued_index::{ @@ -178,7 +178,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5]); } @@ -216,7 +216,7 @@ mod tests { let mut output = Vec::new(); serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); let multivalue = - open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap(); let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5, 6]); } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index cef5a1221..93cf15915 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -3,7 +3,8 @@ use std::io::Write; use std::ops::Range; use std::sync::Arc; -use common::{CountingWriter, OwnedBytes}; +use common::file_slice::FileSlice; +use common::CountingWriter; use super::optional_index::{open_optional_index, serialize_optional_index}; use super::{OptionalIndex, SerializableOptionalIndex, Set}; @@ -44,21 +45,26 @@ pub fn serialize_multivalued_index( } pub fn open_multivalued_index( - bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { match format_version { Version::V1 => { let start_index_column: Arc> = - load_u64_based_column_values(bytes)?; + load_u64_based_column_values(file_slice)?; Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 { start_index_column, })) } Version::V2 => { - let (body_bytes, optional_index_len) = bytes.rsplit(4); - let optional_index_len = - u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap()); + let (body_bytes, optional_index_len) = file_slice.split_from_end(4); + let optional_index_len = u32::from_le_bytes( + optional_index_len + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); let (optional_index_bytes, start_index_bytes) = body_bytes.split(optional_index_len as usize); let optional_index = open_optional_index(optional_index_bytes)?; @@ -185,8 +191,8 @@ impl MultiValueIndex { }; let mut buffer = Vec::new(); serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_multivalued_index(bytes, Version::V2).unwrap() + let file_slice = FileSlice::from(buffer); + open_multivalued_index(file_slice, Version::V2).unwrap() } pub fn get_start_index_column(&self) -> &Arc> { diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index f3ab523e2..3e6ee3a62 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; mod set; mod set_block; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes, VInt}; pub use set::{SelectCursor, Set, SetCodec}; use set_block::{ @@ -266,8 +267,8 @@ impl OptionalIndex { .unwrap_or(true)); let mut buffer = Vec::new(); serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap(); - let bytes = OwnedBytes::new(buffer); - open_optional_index(bytes).unwrap() + let file_slice = FileSlice::from(buffer); + open_optional_index(file_slice).unwrap() } pub fn num_docs(&self) -> RowId { @@ -515,10 +516,17 @@ fn deserialize_optional_index_block_metadatas( (block_metas.into_boxed_slice(), non_null_rows_before_block) } -pub fn open_optional_index(bytes: OwnedBytes) -> io::Result { - let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2); - let num_non_empty_block_bytes = - u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap()); +pub fn open_optional_index(file_slice: FileSlice) -> io::Result { + let (bytes, num_non_empty_blocks_bytes) = file_slice.split_from_end(2); + let num_non_empty_block_bytes = u16::from_le_bytes( + num_non_empty_blocks_bytes + .read_bytes()? + .as_slice() + .try_into() + .unwrap(), + ); + + let mut bytes = bytes.read_bytes()?; let num_rows = VInt::deserialize_u64(&mut bytes)? as u32; let block_metas_num_bytes = num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES; diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index 2bcb77fd6..09a67d340 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -59,7 +59,7 @@ fn test_with_random_sets_simple() { let vals = 10..ELEMENTS_PER_BLOCK * 2; let mut out: Vec = Vec::new(); serialize_optional_index(&vals, 100, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let ranks: Vec = (65_472u32..65_473u32).collect(); let els: Vec = ranks.iter().copied().map(|rank| rank + 10).collect(); let mut select_cursor = null_index.select_cursor(); @@ -102,7 +102,7 @@ impl<'a> Iterable for &'a [bool] { fn test_null_index(data: &[bool]) { let mut out: Vec = Vec::new(); serialize_optional_index(&data, data.len() as RowId, &mut out).unwrap(); - let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); + let null_index = open_optional_index(FileSlice::from(out)).unwrap(); let orig_idx_with_value: Vec = data .iter() .enumerate() @@ -241,7 +241,7 @@ mod bench { .collect(); serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap(); - open_optional_index(OwnedBytes::new(out)).unwrap() + open_optional_index(FileSlice::from(out)).unwrap() } fn random_range_iterator( diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index 673c3459b..2688e441b 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -1,7 +1,8 @@ use std::io; use std::io::Write; -use common::{CountingWriter, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{CountingWriter, HasLen}; use super::multivalued_index::SerializableMultivalueIndex; use super::OptionalIndex; @@ -65,27 +66,28 @@ pub fn serialize_column_index( /// Open a serialized column index. pub fn open_column_index( - mut bytes: OwnedBytes, + file_slice: FileSlice, format_version: Version, ) -> io::Result { - if bytes.is_empty() { + if file_slice.len() == 0 { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "Failed to deserialize column index. Empty buffer.", )); } - let cardinality_code = bytes[0]; + let (header, body) = file_slice.split(1); + let cardinality_code = header.read_bytes()?.as_slice()[0]; let cardinality = Cardinality::try_from_code(cardinality_code)?; - bytes.advance(1); + match cardinality { Cardinality::Full => Ok(ColumnIndex::Full), Cardinality::Optional => { - let optional_index = super::optional_index::open_optional_index(bytes)?; + let optional_index = super::optional_index::open_optional_index(body)?; Ok(ColumnIndex::Optional(optional_index)) } Cardinality::Multivalued => { let multivalue_index = - super::multivalued_index::open_multivalued_index(bytes, format_version)?; + super::multivalued_index::open_multivalued_index(body, format_version)?; Ok(ColumnIndex::Multivalued(multivalue_index)) } } diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index ef5de5154..59840ad05 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -31,8 +31,7 @@ pub use u128_based::{ CompactSpaceU64Accessor, }; pub use u64_based::{ - load_u64_based_column_values, serialize_and_load_u64_based_column_values, - serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, + load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, }; pub use vec_column::VecColumn; diff --git a/columnar/src/column_values/stats.rs b/columnar/src/column_values/stats.rs index 40f042525..794619350 100644 --- a/columnar/src/column_values/stats.rs +++ b/columnar/src/column_values/stats.rs @@ -27,6 +27,43 @@ impl ColumnStats { } } +impl ColumnStats { + /// Same as [`BinarySeerializable::deserialize`] but also returns the number of bytes + /// consumed from the reader `R` + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + + let (min_value, len) = VInt::deserialize_with_size(reader)?; + let min_value = min_value.0; + nbytes += len; + + let (gcd, len) = VInt::deserialize_with_size(reader)?; + let gcd = gcd.0; + let gcd = NonZeroU64::new(gcd) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD of 0 is forbidden"))?; + nbytes += len; + + let (amplitude, len) = VInt::deserialize_with_size(reader)?; + let amplitude = amplitude.0 * gcd.get(); + let max_value = min_value + amplitude; + nbytes += len; + + let (num_rows, len) = VInt::deserialize_with_size(reader)?; + let num_rows = num_rows.0 as RowId; + nbytes += len; + + Ok(( + ColumnStats { + min_value, + max_value, + num_rows, + gcd, + }, + nbytes, + )) + } +} + impl BinarySerializable for ColumnStats { fn serialize(&self, writer: &mut W) -> io::Result<()> { VInt(self.min_value).serialize(writer)?; diff --git a/columnar/src/column_values/u128_based/compact_space/mod.rs b/columnar/src/column_values/u128_based/compact_space/mod.rs index f246c7b0c..8570a37ed 100644 --- a/columnar/src/column_values/u128_based/compact_space/mod.rs +++ b/columnar/src/column_values/u128_based/compact_space/mod.rs @@ -767,7 +767,7 @@ mod tests { ]; let mut out = Vec::new(); serialize_column_values_u128(&&vals[..], &mut out).unwrap(); - let decomp = open_u128_mapped(OwnedBytes::new(out)).unwrap(); + let decomp = open_u128_mapped(FileSlice::from(out)).unwrap(); let complete_range = 0..vals.len() as u32; assert_eq!( @@ -821,6 +821,7 @@ mod tests { let _data = test_aux_vals(vals); } + use common::file_slice::FileSlice; use proptest::prelude::*; fn num_strategy() -> impl Strategy { diff --git a/columnar/src/column_values/u128_based/mod.rs b/columnar/src/column_values/u128_based/mod.rs index 30665630a..fc71ae6a0 100644 --- a/columnar/src/column_values/u128_based/mod.rs +++ b/columnar/src/column_values/u128_based/mod.rs @@ -5,7 +5,8 @@ use std::sync::Arc; mod compact_space; -use common::{BinarySerializable, OwnedBytes, VInt}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, VInt}; pub use compact_space::{ CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor, }; @@ -101,8 +102,9 @@ impl U128FastFieldCodecType { /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open_u128_mapped( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceDecompressor::open(bytes)?; @@ -120,7 +122,8 @@ pub fn open_u128_mapped( /// # Notice /// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and /// also handle the new codecs. -pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result>> { +pub fn open_u128_as_compact_u64(file_slice: FileSlice) -> io::Result>> { + let mut bytes = file_slice.read_bytes()?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceU64Accessor::open(bytes)?; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 3ed999648..622b02421 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -2,6 +2,7 @@ use std::io::{self, Write}; use std::num::NonZeroU64; use std::ops::{Range, RangeInclusive}; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -137,7 +138,8 @@ impl ColumnCodec for BitpackedCodec { type Estimator = BitpackedCodecEstimator; /// Opens a fast field given a file. - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let num_bits = num_bits(&stats); let bit_unpacker = BitUnpacker::new(num_bits); diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index eb9191aa8..57b1291f2 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -1,8 +1,10 @@ use std::io::Write; -use std::sync::Arc; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, OnceLock}; use std::{io, iter}; -use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes}; +use common::file_slice::FileSlice; +use common::{BinarySerializable, CountingWriter, DeserializeFrom, HasLen, OwnedBytes}; use fastdivide::DividerU64; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -172,32 +174,74 @@ impl ColumnCodec for BlockwiseLinearCodec { type Estimator = BlockwiseLinearEstimator; - fn load(mut bytes: OwnedBytes) -> io::Result { - let stats = ColumnStats::deserialize(&mut bytes)?; - let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; - let footer_offset = bytes.len() - 4 - footer_len as usize; - let (data, mut footer) = bytes.split(footer_offset); + fn load(file_slice: FileSlice) -> io::Result { + // [`ColumnStats::deserialize_with_size`] deserializes 4 variable-width encoded u64s, which + // could end up being, in the worst case, 9 bytes each. this is where the 36 comes from + let (stats, _) = file_slice.clone().split(36.min(file_slice.len())); // hope that's enough bytes + let mut stats = stats.read_bytes()?; + let (stats, stats_nbytes) = ColumnStats::deserialize_with_size(&mut stats)?; + + let (_, body) = file_slice.split(stats_nbytes); + + let (_, footer) = body.clone().split_from_end(4); + + let footer_len: u32 = footer.read_bytes()?.as_slice().deserialize()?; + let (data, footer) = body.split_from_end(footer_len as usize + 4); + + let mut footer = footer.read_bytes()?; let num_blocks = compute_num_blocks(stats.num_rows); - let mut blocks: Vec = iter::repeat_with(|| Block::deserialize(&mut footer)) - .take(num_blocks as usize) - .collect::>()?; + let mut blocks: Vec = + iter::repeat_with(|| Block::deserialize(&mut footer)) + .take(num_blocks as usize) + .map(|block| { + block.map(|block| BlockWithLength { + block, + file_slice: FileSlice::from(Vec::new()), + data: OnceLock::default(), + }) + }) + .collect::>()?; + let mut start_offset = 0; for block in &mut blocks { + let len = (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; block.data_start_offset = start_offset; - start_offset += (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8; + block.file_slice = data + .clone() + .slice(start_offset..(start_offset + len).min(data.len())); + start_offset += len; } + Ok(BlockwiseLinearReader { blocks: blocks.into_boxed_slice().into(), - data, stats, }) } } +struct BlockWithLength { + block: Block, + file_slice: FileSlice, + data: OnceLock, +} + +impl Deref for BlockWithLength { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.block + } +} + +impl DerefMut for BlockWithLength { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.block + } +} + #[derive(Clone)] pub struct BlockwiseLinearReader { - blocks: Arc<[Block]>, - data: OwnedBytes, + blocks: Arc<[BlockWithLength]>, stats: ColumnStats, } @@ -208,7 +252,9 @@ impl ColumnValues for BlockwiseLinearReader { let idx_within_block = idx % BLOCK_SIZE; let block = &self.blocks[block_id]; let interpoled_val: u64 = block.line.eval(idx_within_block); - let block_bytes = &self.data[block.data_start_offset..]; + let block_bytes = block + .data + .get_or_init(|| block.file_slice.read_bytes().unwrap()); let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); // TODO optimize me! the line parameters could be tweaked to include the multiplication and // remove the dependency. diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index ba0c9e641..b3d5caf8b 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -1,5 +1,6 @@ use std::io; +use common::file_slice::FileSlice; use common::{BinarySerializable, OwnedBytes}; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; @@ -190,7 +191,8 @@ impl ColumnCodec for LinearCodec { type Estimator = LinearCodecEstimator; - fn load(mut data: OwnedBytes) -> io::Result { + fn load(file_slice: FileSlice) -> io::Result { + let mut data = file_slice.read_bytes()?; let stats = ColumnStats::deserialize(&mut data)?; let linear_params = LinearParams::deserialize(&mut data)?; Ok(LinearReader { diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 7afc71e3f..9ae82ee0c 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -8,7 +8,8 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::{BinarySerializable, OwnedBytes}; +use common::file_slice::FileSlice; +use common::BinarySerializable; use crate::column_values::monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, @@ -60,7 +61,7 @@ pub trait ColumnCodec { type Estimator: ColumnCodecEstimator + Default; /// Loads a column that has been serialized using this codec. - fn load(bytes: OwnedBytes) -> io::Result; + fn load(file_slice: FileSlice) -> io::Result; /// Returns an estimator. fn estimator() -> Self::Estimator { @@ -111,20 +112,22 @@ impl CodecType { fn load( &self, - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { match self { - CodecType::Bitpacked => load_specific_codec::(bytes), - CodecType::Linear => load_specific_codec::(bytes), - CodecType::BlockwiseLinear => load_specific_codec::(bytes), + CodecType::Bitpacked => load_specific_codec::(file_slice), + CodecType::Linear => load_specific_codec::(file_slice), + CodecType::BlockwiseLinear => { + load_specific_codec::(file_slice) + } } } } fn load_specific_codec( - bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let reader = C::load(bytes)?; + let reader = C::load(file_slice)?; let reader_typed = monotonic_map_column( reader, StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::::new()), @@ -189,25 +192,28 @@ pub fn serialize_u64_based_column_values( /// /// This method first identifies the codec off the first byte. pub fn load_u64_based_column_values( - mut bytes: OwnedBytes, + file_slice: FileSlice, ) -> io::Result>> { - let codec_type: CodecType = bytes - .first() - .copied() + let (header, body) = file_slice.split(1); + let codec_type: CodecType = header + .read_bytes()? + .as_slice() + .get(0) + .cloned() .and_then(CodecType::try_from_code) .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to read codec type"))?; - bytes.advance(1); - codec_type.load(bytes) + codec_type.load(body) } /// Helper function to serialize a column (autodetect from all codecs) and then open it +#[cfg(test)] pub fn serialize_and_load_u64_based_column_values( vals: &dyn Iterable, codec_types: &[CodecType], ) -> Arc> { let mut buffer = Vec::new(); serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap(); - load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap() + load_u64_based_column_values::(FileSlice::from(buffer)).unwrap() } #[cfg(test)] diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 973ff6d90..983d76914 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -1,3 +1,4 @@ +use common::HasLen; use proptest::prelude::*; use proptest::{prop_oneof, proptest}; @@ -12,7 +13,7 @@ fn test_serialize_and_load_simple() { ) .unwrap(); assert_eq!(buffer.len(), 7); - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 3); assert_eq!(col.get_val(0), 1); assert_eq!(col.get_val(1), 2); @@ -29,7 +30,7 @@ fn test_empty_column_i64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), i64::MIN); assert_eq!(col.max_value(), i64::MIN); @@ -47,7 +48,7 @@ fn test_empty_column_u64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); assert_eq!(col.min_value(), u64::MIN); assert_eq!(col.max_value(), u64::MIN); @@ -65,7 +66,7 @@ fn test_empty_column_f64() { continue; } num_acceptable_codecs += 1; - let col = load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap(); + let col = load_u64_based_column_values::(FileSlice::from(buffer)).unwrap(); assert_eq!(col.num_vals(), 0); // FIXME. f64::MIN would be better! assert!(col.min_value().is_nan()); @@ -96,7 +97,7 @@ pub(crate) fn create_and_validate( let actual_compression = buffer.len() as u64; - let reader = TColumnCodec::load(OwnedBytes::new(buffer)).unwrap(); + let reader = TColumnCodec::load(FileSlice::from(buffer)).unwrap(); assert_eq!(reader.num_vals(), vals.len() as u32); let mut buffer = Vec::new(); for (doc, orig_val) in vals.iter().copied().enumerate() { @@ -325,7 +326,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), -4000i64); assert_eq!(column.get_val(1), -3000i64); @@ -342,7 +343,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) @@ -368,7 +369,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer, )?; - let buffer = OwnedBytes::new(buffer); + let buffer = FileSlice::from(buffer); let column = crate::column_values::load_u64_based_column_values::(buffer.clone())?; assert_eq!(column.get_val(0), 1000u64); assert_eq!(column.get_val(1), 2000u64); @@ -385,7 +386,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> &[codec_type], &mut buffer_without_gcd, )?; - let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + let buffer_without_gcd = FileSlice::from(buffer_without_gcd); assert!(buffer_without_gcd.len() > buffer.len()); Ok(()) } @@ -404,7 +405,7 @@ fn test_fastfield_gcd_u64() -> io::Result<()> { #[test] pub fn test_fastfield2() { - let test_fastfield = crate::column_values::serialize_and_load_u64_based_column_values::( + let test_fastfield = serialize_and_load_u64_based_column_values::( &&[100u64, 200u64, 300u64][..], &ALL_U64_CODEC_TYPES, ); diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 2b9d69770..9fbfc3436 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::{fmt, io}; use common::file_slice::FileSlice; -use common::{ByteCount, DateTime, HasLen, OwnedBytes}; +use common::{ByteCount, DateTime, HasLen}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; @@ -238,8 +238,7 @@ pub struct DynamicColumnHandle { impl DynamicColumnHandle { // TODO rename load pub fn open(&self) -> io::Result { - let column_bytes: OwnedBytes = self.file_slice.read_bytes()?; - self.open_internal(column_bytes) + self.open_internal(self.file_slice.clone()) } #[doc(hidden)] @@ -258,16 +257,15 @@ impl DynamicColumnHandle { /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. pub fn open_u64_lenient(&self) -> io::Result>> { - let column_bytes = self.file_slice.read_bytes()?; match self.column_type { ColumnType::Str | ColumnType::Bytes => { let column: BytesColumn = - crate::column::open_column_bytes(column_bytes, self.format_version)?; + crate::column::open_column_bytes(self.file_slice.clone(), self.format_version)?; Ok(Some(column.term_ord_column)) } ColumnType::IpAddr => { let column = crate::column::open_column_u128_as_compact_u64( - column_bytes, + self.file_slice.clone(), self.format_version, )?; Ok(Some(column)) @@ -277,40 +275,40 @@ impl DynamicColumnHandle { | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { - let column = - crate::column::open_column_u64::(column_bytes, self.format_version)?; + let column = crate::column::open_column_u64::( + self.file_slice.clone(), + self.format_version, + )?; Ok(Some(column)) } } } - fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { + fn open_internal(&self, file_slice: FileSlice) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { ColumnType::Bytes => { - crate::column::open_column_bytes(column_bytes, self.format_version)?.into() + crate::column::open_column_bytes(file_slice, self.format_version)?.into() } ColumnType::Str => { - crate::column::open_column_str(column_bytes, self.format_version)?.into() + crate::column::open_column_str(file_slice, self.format_version)?.into() } ColumnType::I64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::U64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::F64 => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::Bool => { - crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } ColumnType::IpAddr => { - crate::column::open_column_u128::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u128::(file_slice, self.format_version)?.into() } ColumnType::DateTime => { - crate::column::open_column_u64::(column_bytes, self.format_version)? - .into() + crate::column::open_column_u64::(file_slice, self.format_version)?.into() } }; Ok(dynamic_column) diff --git a/common/src/vint.rs b/common/src/vint.rs index b09e73b92..9ffc27c16 100644 --- a/common/src/vint.rs +++ b/common/src/vint.rs @@ -56,6 +56,33 @@ impl BinarySerializable for VIntU128 { #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct VInt(pub u64); +impl VInt { + pub fn deserialize_with_size(reader: &mut R) -> io::Result<(Self, usize)> { + let mut nbytes = 0; + let mut bytes = reader.bytes(); + let mut result = 0u64; + let mut shift = 0u64; + loop { + match bytes.next() { + Some(Ok(b)) => { + nbytes += 1; + result |= u64::from(b % 128u8) << shift; + if b >= STOP_BIT { + return Ok((VInt(result), nbytes)); + } + shift += 7; + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Reach end of buffer while reading VInt", + )); + } + } + } + } +} + const STOP_BIT: u8 = 128; #[inline] @@ -221,7 +248,6 @@ impl BinarySerializable for VInt { #[cfg(test)] mod tests { - use super::{serialize_vint_u32, BinarySerializable, VInt}; fn aux_test_vint(val: u64) { diff --git a/runtests.sh b/runtests.sh new file mode 100755 index 000000000..c915be42f --- /dev/null +++ b/runtests.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 5123ed9c9..031243077 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -822,7 +822,6 @@ mod tests { #[cfg(all(test, feature = "unstable"))] mod bench { - use rand::seq::SliceRandom; use rand::thread_rng; use test::Bencher; diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 8077577d2..13f9bd589 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -263,7 +263,6 @@ impl SegmentCollector for MultiCollectorChild { #[cfg(test)] mod tests { - use super::*; use crate::collector::{Count, TopDocs}; use crate::query::TermQuery; diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 4d4e08583..65287401d 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -40,7 +40,6 @@ impl DocToOpstampMapping<'_> { #[cfg(test)] mod tests { - use super::DocToOpstampMapping; #[test] diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 58610c139..782b86fc0 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -273,7 +273,6 @@ impl Recorder for TfAndPositionRecorder { #[cfg(test)] mod tests { - use common::write_u32_vint; use super::{BufferLender, VInt32Reader}; diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index 7b617866f..288281969 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -315,7 +315,7 @@ impl Weight for BooleanWeight crate::Result<()> { - let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?; + let scorer = self.complex_scorer(reader, 1.0, DoNothingCombiner::default)?; let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; match scorer { diff --git a/src/query/set_query.rs b/src/query/set_query.rs index a8412ab8e..650256bcd 100644 --- a/src/query/set_query.rs +++ b/src/query/set_query.rs @@ -63,7 +63,7 @@ impl TermSetQuery { Ok(BooleanWeight::new( sub_queries, false, - Box::new(|| DoNothingCombiner), + Box::new(DoNothingCombiner::default), )) } } diff --git a/src/tokenizer/facet_tokenizer.rs b/src/tokenizer/facet_tokenizer.rs index 089839367..2c80a6368 100644 --- a/src/tokenizer/facet_tokenizer.rs +++ b/src/tokenizer/facet_tokenizer.rs @@ -84,7 +84,6 @@ impl TokenStream for FacetTokenStream<'_> { #[cfg(test)] mod tests { - use super::FacetTokenizer; use crate::schema::Facet; use crate::tokenizer::{Token, TokenStream, Tokenizer}; diff --git a/src/tokenizer/ngram_tokenizer.rs b/src/tokenizer/ngram_tokenizer.rs index b975efc4f..83a2dd4e5 100644 --- a/src/tokenizer/ngram_tokenizer.rs +++ b/src/tokenizer/ngram_tokenizer.rs @@ -313,7 +313,6 @@ fn utf8_codepoint_width(b: u8) -> usize { #[cfg(test)] mod tests { - use super::{utf8_codepoint_width, CodepointFrontiers, NgramTokenizer, StutteringIterator}; use crate::tokenizer::tests::assert_token; use crate::tokenizer::{Token, TokenStream, Tokenizer};