mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-30 15:10:40 +00:00
perf: push FileSlices down through most of fast fields (#19)
This PR modifies internal API signatures and implementation details so that `FileSlice`s are passed down into the innards of (at least) the `BlockwiseLinearCodec`. This allows tantivy to defer dereferencing large slices of bytes when reading numeric fast fields, and instead dereference only the slice of bytes it needs for any given compressed Block. The motivation here is for external `Directory` implementations where it's not exactly efficient to dereference large slices of bytes.
This commit is contained in:
committed by
Philippe Noël
parent
34c17a1685
commit
020bdffd61
3
.gitignore
vendored
3
.gitignore
vendored
@@ -15,3 +15,6 @@ trace.dat
|
||||
cargo-timing*
|
||||
control
|
||||
variable
|
||||
|
||||
# for `sample record -p`
|
||||
profile.json
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::io;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::OwnedBytes;
|
||||
use common::file_slice::FileSlice;
|
||||
use sstable::Dictionary;
|
||||
|
||||
use crate::column::{BytesColumn, Column};
|
||||
@@ -41,12 +41,13 @@ pub fn serialize_column_mappable_to_u64<T: MonotonicallyMappableToU64>(
|
||||
}
|
||||
|
||||
pub fn open_column_u64<T: MonotonicallyMappableToU64>(
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<Column<T>> {
|
||||
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
|
||||
let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4);
|
||||
let column_index_num_bytes = u32::from_le_bytes(
|
||||
column_index_num_bytes_payload
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
@@ -61,12 +62,13 @@ pub fn open_column_u64<T: MonotonicallyMappableToU64>(
|
||||
}
|
||||
|
||||
pub fn open_column_u128<T: MonotonicallyMappableToU128>(
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<Column<T>> {
|
||||
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
|
||||
let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4);
|
||||
let column_index_num_bytes = u32::from_le_bytes(
|
||||
column_index_num_bytes_payload
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
@@ -84,12 +86,13 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
|
||||
///
|
||||
/// See [`open_u128_as_compact_u64`] for more details.
|
||||
pub fn open_column_u128_as_compact_u64(
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<Column<u64>> {
|
||||
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
|
||||
let (body, column_index_num_bytes_payload) = file_slice.split_from_end(4);
|
||||
let column_index_num_bytes = u32::from_le_bytes(
|
||||
column_index_num_bytes_payload
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
@@ -103,10 +106,21 @@ pub fn open_column_u128_as_compact_u64(
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result<BytesColumn> {
|
||||
let (body, dictionary_len_bytes) = data.rsplit(4);
|
||||
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());
|
||||
pub fn open_column_bytes(
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<BytesColumn> {
|
||||
let (body, dictionary_len_bytes) = file_slice.split_from_end(4);
|
||||
let dictionary_len = u32::from_le_bytes(
|
||||
dictionary_len_bytes
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
);
|
||||
let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize);
|
||||
|
||||
let dictionary_bytes = dictionary_bytes.read_bytes()?;
|
||||
let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?);
|
||||
let term_ord_column = crate::column::open_column_u64::<u64>(column_bytes, format_version)?;
|
||||
Ok(BytesColumn {
|
||||
@@ -115,7 +129,7 @@ pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Resul
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result<StrColumn> {
|
||||
let bytes_column = open_column_bytes(data, format_version)?;
|
||||
pub fn open_column_str(file_slice: FileSlice, format_version: Version) -> io::Result<StrColumn> {
|
||||
let bytes_column = open_column_bytes(file_slice, format_version)?;
|
||||
Ok(StrColumn::wrap(bytes_column))
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ pub fn merge_column_index<'a>(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common::OwnedBytes;
|
||||
use common::file_slice::FileSlice;
|
||||
|
||||
use crate::column_index::merge::detect_cardinality;
|
||||
use crate::column_index::multivalued_index::{
|
||||
@@ -178,7 +178,7 @@ mod tests {
|
||||
let mut output = Vec::new();
|
||||
serialize_multivalued_index(&start_index_iterable, &mut output).unwrap();
|
||||
let multivalue =
|
||||
open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap();
|
||||
open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap();
|
||||
let start_indexes: Vec<RowId> = multivalue.get_start_index_column().iter().collect();
|
||||
assert_eq!(&start_indexes, &[0, 3, 5]);
|
||||
}
|
||||
@@ -216,7 +216,7 @@ mod tests {
|
||||
let mut output = Vec::new();
|
||||
serialize_multivalued_index(&start_index_iterable, &mut output).unwrap();
|
||||
let multivalue =
|
||||
open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap();
|
||||
open_multivalued_index(FileSlice::from(output), crate::Version::V2).unwrap();
|
||||
let start_indexes: Vec<RowId> = multivalue.get_start_index_column().iter().collect();
|
||||
assert_eq!(&start_indexes, &[0, 3, 5, 6]);
|
||||
}
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::io::Write;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::CountingWriter;
|
||||
|
||||
use super::optional_index::{open_optional_index, serialize_optional_index};
|
||||
use super::{OptionalIndex, SerializableOptionalIndex, Set};
|
||||
@@ -44,21 +45,26 @@ pub fn serialize_multivalued_index(
|
||||
}
|
||||
|
||||
pub fn open_multivalued_index(
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<MultiValueIndex> {
|
||||
match format_version {
|
||||
Version::V1 => {
|
||||
let start_index_column: Arc<dyn ColumnValues<RowId>> =
|
||||
load_u64_based_column_values(bytes)?;
|
||||
load_u64_based_column_values(file_slice)?;
|
||||
Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 {
|
||||
start_index_column,
|
||||
}))
|
||||
}
|
||||
Version::V2 => {
|
||||
let (body_bytes, optional_index_len) = bytes.rsplit(4);
|
||||
let optional_index_len =
|
||||
u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap());
|
||||
let (body_bytes, optional_index_len) = file_slice.split_from_end(4);
|
||||
let optional_index_len = u32::from_le_bytes(
|
||||
optional_index_len
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
);
|
||||
let (optional_index_bytes, start_index_bytes) =
|
||||
body_bytes.split(optional_index_len as usize);
|
||||
let optional_index = open_optional_index(optional_index_bytes)?;
|
||||
@@ -185,8 +191,8 @@ impl MultiValueIndex {
|
||||
};
|
||||
let mut buffer = Vec::new();
|
||||
serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap();
|
||||
let bytes = OwnedBytes::new(buffer);
|
||||
open_multivalued_index(bytes, Version::V2).unwrap()
|
||||
let file_slice = FileSlice::from(buffer);
|
||||
open_multivalued_index(file_slice, Version::V2).unwrap()
|
||||
}
|
||||
|
||||
pub fn get_start_index_column(&self) -> &Arc<dyn crate::ColumnValues<RowId>> {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
mod set;
|
||||
mod set_block;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
pub use set::{SelectCursor, Set, SetCodec};
|
||||
use set_block::{
|
||||
@@ -266,8 +267,8 @@ impl OptionalIndex {
|
||||
.unwrap_or(true));
|
||||
let mut buffer = Vec::new();
|
||||
serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap();
|
||||
let bytes = OwnedBytes::new(buffer);
|
||||
open_optional_index(bytes).unwrap()
|
||||
let file_slice = FileSlice::from(buffer);
|
||||
open_optional_index(file_slice).unwrap()
|
||||
}
|
||||
|
||||
pub fn num_docs(&self) -> RowId {
|
||||
@@ -515,10 +516,17 @@ fn deserialize_optional_index_block_metadatas(
|
||||
(block_metas.into_boxed_slice(), non_null_rows_before_block)
|
||||
}
|
||||
|
||||
pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
|
||||
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
|
||||
let num_non_empty_block_bytes =
|
||||
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
|
||||
pub fn open_optional_index(file_slice: FileSlice) -> io::Result<OptionalIndex> {
|
||||
let (bytes, num_non_empty_blocks_bytes) = file_slice.split_from_end(2);
|
||||
let num_non_empty_block_bytes = u16::from_le_bytes(
|
||||
num_non_empty_blocks_bytes
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let mut bytes = bytes.read_bytes()?;
|
||||
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
|
||||
let block_metas_num_bytes =
|
||||
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
|
||||
|
||||
@@ -59,7 +59,7 @@ fn test_with_random_sets_simple() {
|
||||
let vals = 10..ELEMENTS_PER_BLOCK * 2;
|
||||
let mut out: Vec<u8> = Vec::new();
|
||||
serialize_optional_index(&vals, 100, &mut out).unwrap();
|
||||
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
let null_index = open_optional_index(FileSlice::from(out)).unwrap();
|
||||
let ranks: Vec<u32> = (65_472u32..65_473u32).collect();
|
||||
let els: Vec<u32> = ranks.iter().copied().map(|rank| rank + 10).collect();
|
||||
let mut select_cursor = null_index.select_cursor();
|
||||
@@ -102,7 +102,7 @@ impl<'a> Iterable<RowId> for &'a [bool] {
|
||||
fn test_null_index(data: &[bool]) {
|
||||
let mut out: Vec<u8> = Vec::new();
|
||||
serialize_optional_index(&data, data.len() as RowId, &mut out).unwrap();
|
||||
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
let null_index = open_optional_index(FileSlice::from(out)).unwrap();
|
||||
let orig_idx_with_value: Vec<u32> = data
|
||||
.iter()
|
||||
.enumerate()
|
||||
@@ -241,7 +241,7 @@ mod bench {
|
||||
.collect();
|
||||
serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap();
|
||||
|
||||
open_optional_index(OwnedBytes::new(out)).unwrap()
|
||||
open_optional_index(FileSlice::from(out)).unwrap()
|
||||
}
|
||||
|
||||
fn random_range_iterator(
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{CountingWriter, HasLen};
|
||||
|
||||
use super::multivalued_index::SerializableMultivalueIndex;
|
||||
use super::OptionalIndex;
|
||||
@@ -65,27 +66,28 @@ pub fn serialize_column_index(
|
||||
|
||||
/// Open a serialized column index.
|
||||
pub fn open_column_index(
|
||||
mut bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
format_version: Version,
|
||||
) -> io::Result<ColumnIndex> {
|
||||
if bytes.is_empty() {
|
||||
if file_slice.len() == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"Failed to deserialize column index. Empty buffer.",
|
||||
));
|
||||
}
|
||||
let cardinality_code = bytes[0];
|
||||
let (header, body) = file_slice.split(1);
|
||||
let cardinality_code = header.read_bytes()?.as_slice()[0];
|
||||
let cardinality = Cardinality::try_from_code(cardinality_code)?;
|
||||
bytes.advance(1);
|
||||
|
||||
match cardinality {
|
||||
Cardinality::Full => Ok(ColumnIndex::Full),
|
||||
Cardinality::Optional => {
|
||||
let optional_index = super::optional_index::open_optional_index(bytes)?;
|
||||
let optional_index = super::optional_index::open_optional_index(body)?;
|
||||
Ok(ColumnIndex::Optional(optional_index))
|
||||
}
|
||||
Cardinality::Multivalued => {
|
||||
let multivalue_index =
|
||||
super::multivalued_index::open_multivalued_index(bytes, format_version)?;
|
||||
super::multivalued_index::open_multivalued_index(body, format_version)?;
|
||||
Ok(ColumnIndex::Multivalued(multivalue_index))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,8 +31,7 @@ pub use u128_based::{
|
||||
CompactSpaceU64Accessor,
|
||||
};
|
||||
pub use u64_based::{
|
||||
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
|
||||
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
|
||||
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
|
||||
};
|
||||
pub use vec_column::VecColumn;
|
||||
|
||||
|
||||
@@ -27,6 +27,43 @@ impl ColumnStats {
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnStats {
|
||||
/// Same as [`BinarySeerializable::deserialize`] but also returns the number of bytes
|
||||
/// consumed from the reader `R`
|
||||
pub fn deserialize_with_size<R: io::Read>(reader: &mut R) -> io::Result<(Self, usize)> {
|
||||
let mut nbytes = 0;
|
||||
|
||||
let (min_value, len) = VInt::deserialize_with_size(reader)?;
|
||||
let min_value = min_value.0;
|
||||
nbytes += len;
|
||||
|
||||
let (gcd, len) = VInt::deserialize_with_size(reader)?;
|
||||
let gcd = gcd.0;
|
||||
let gcd = NonZeroU64::new(gcd)
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD of 0 is forbidden"))?;
|
||||
nbytes += len;
|
||||
|
||||
let (amplitude, len) = VInt::deserialize_with_size(reader)?;
|
||||
let amplitude = amplitude.0 * gcd.get();
|
||||
let max_value = min_value + amplitude;
|
||||
nbytes += len;
|
||||
|
||||
let (num_rows, len) = VInt::deserialize_with_size(reader)?;
|
||||
let num_rows = num_rows.0 as RowId;
|
||||
nbytes += len;
|
||||
|
||||
Ok((
|
||||
ColumnStats {
|
||||
min_value,
|
||||
max_value,
|
||||
num_rows,
|
||||
gcd,
|
||||
},
|
||||
nbytes,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl BinarySerializable for ColumnStats {
|
||||
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
|
||||
VInt(self.min_value).serialize(writer)?;
|
||||
|
||||
@@ -767,7 +767,7 @@ mod tests {
|
||||
];
|
||||
let mut out = Vec::new();
|
||||
serialize_column_values_u128(&&vals[..], &mut out).unwrap();
|
||||
let decomp = open_u128_mapped(OwnedBytes::new(out)).unwrap();
|
||||
let decomp = open_u128_mapped(FileSlice::from(out)).unwrap();
|
||||
let complete_range = 0..vals.len() as u32;
|
||||
|
||||
assert_eq!(
|
||||
@@ -821,6 +821,7 @@ mod tests {
|
||||
let _data = test_aux_vals(vals);
|
||||
}
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use proptest::prelude::*;
|
||||
|
||||
fn num_strategy() -> impl Strategy<Value = u128> {
|
||||
|
||||
@@ -5,7 +5,8 @@ use std::sync::Arc;
|
||||
|
||||
mod compact_space;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, VInt};
|
||||
pub use compact_space::{
|
||||
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
|
||||
};
|
||||
@@ -101,8 +102,9 @@ impl U128FastFieldCodecType {
|
||||
|
||||
/// Returns the correct codec reader wrapped in the `Arc` for the data.
|
||||
pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
|
||||
mut bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
) -> io::Result<Arc<dyn ColumnValues<T>>> {
|
||||
let mut bytes = file_slice.read_bytes()?;
|
||||
let header = U128Header::deserialize(&mut bytes)?;
|
||||
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
|
||||
let reader = CompactSpaceDecompressor::open(bytes)?;
|
||||
@@ -120,7 +122,8 @@ pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
|
||||
/// # Notice
|
||||
/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and
|
||||
/// also handle the new codecs.
|
||||
pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<u64>>> {
|
||||
pub fn open_u128_as_compact_u64(file_slice: FileSlice) -> io::Result<Arc<dyn ColumnValues<u64>>> {
|
||||
let mut bytes = file_slice.read_bytes()?;
|
||||
let header = U128Header::deserialize(&mut bytes)?;
|
||||
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
|
||||
let reader = CompactSpaceU64Accessor::open(bytes)?;
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::io::{self, Write};
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
@@ -137,7 +138,8 @@ impl ColumnCodec for BitpackedCodec {
|
||||
type Estimator = BitpackedCodecEstimator;
|
||||
|
||||
/// Opens a fast field given a file.
|
||||
fn load(mut data: OwnedBytes) -> io::Result<Self::ColumnValues> {
|
||||
fn load(file_slice: FileSlice) -> io::Result<Self::ColumnValues> {
|
||||
let mut data = file_slice.read_bytes()?;
|
||||
let stats = ColumnStats::deserialize(&mut data)?;
|
||||
let num_bits = num_bits(&stats);
|
||||
let bit_unpacker = BitUnpacker::new(num_bits);
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::{io, iter};
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom, HasLen, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
@@ -172,32 +174,74 @@ impl ColumnCodec<u64> for BlockwiseLinearCodec {
|
||||
|
||||
type Estimator = BlockwiseLinearEstimator;
|
||||
|
||||
fn load(mut bytes: OwnedBytes) -> io::Result<Self::ColumnValues> {
|
||||
let stats = ColumnStats::deserialize(&mut bytes)?;
|
||||
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
|
||||
let footer_offset = bytes.len() - 4 - footer_len as usize;
|
||||
let (data, mut footer) = bytes.split(footer_offset);
|
||||
fn load(file_slice: FileSlice) -> io::Result<Self::ColumnValues> {
|
||||
// [`ColumnStats::deserialize_with_size`] deserializes 4 variable-width encoded u64s, which
|
||||
// could end up being, in the worst case, 9 bytes each. this is where the 36 comes from
|
||||
let (stats, _) = file_slice.clone().split(36.min(file_slice.len())); // hope that's enough bytes
|
||||
let mut stats = stats.read_bytes()?;
|
||||
let (stats, stats_nbytes) = ColumnStats::deserialize_with_size(&mut stats)?;
|
||||
|
||||
let (_, body) = file_slice.split(stats_nbytes);
|
||||
|
||||
let (_, footer) = body.clone().split_from_end(4);
|
||||
|
||||
let footer_len: u32 = footer.read_bytes()?.as_slice().deserialize()?;
|
||||
let (data, footer) = body.split_from_end(footer_len as usize + 4);
|
||||
|
||||
let mut footer = footer.read_bytes()?;
|
||||
let num_blocks = compute_num_blocks(stats.num_rows);
|
||||
let mut blocks: Vec<Block> = iter::repeat_with(|| Block::deserialize(&mut footer))
|
||||
.take(num_blocks as usize)
|
||||
.collect::<io::Result<_>>()?;
|
||||
let mut blocks: Vec<BlockWithLength> =
|
||||
iter::repeat_with(|| Block::deserialize(&mut footer))
|
||||
.take(num_blocks as usize)
|
||||
.map(|block| {
|
||||
block.map(|block| BlockWithLength {
|
||||
block,
|
||||
file_slice: FileSlice::from(Vec::new()),
|
||||
data: OnceLock::default(),
|
||||
})
|
||||
})
|
||||
.collect::<io::Result<_>>()?;
|
||||
|
||||
let mut start_offset = 0;
|
||||
for block in &mut blocks {
|
||||
let len = (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8;
|
||||
block.data_start_offset = start_offset;
|
||||
start_offset += (block.bit_unpacker.bit_width() as usize) * BLOCK_SIZE as usize / 8;
|
||||
block.file_slice = data
|
||||
.clone()
|
||||
.slice(start_offset..(start_offset + len).min(data.len()));
|
||||
start_offset += len;
|
||||
}
|
||||
|
||||
Ok(BlockwiseLinearReader {
|
||||
blocks: blocks.into_boxed_slice().into(),
|
||||
data,
|
||||
stats,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct BlockWithLength {
|
||||
block: Block,
|
||||
file_slice: FileSlice,
|
||||
data: OnceLock<OwnedBytes>,
|
||||
}
|
||||
|
||||
impl Deref for BlockWithLength {
|
||||
type Target = Block;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.block
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for BlockWithLength {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.block
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BlockwiseLinearReader {
|
||||
blocks: Arc<[Block]>,
|
||||
data: OwnedBytes,
|
||||
blocks: Arc<[BlockWithLength]>,
|
||||
stats: ColumnStats,
|
||||
}
|
||||
|
||||
@@ -208,7 +252,9 @@ impl ColumnValues for BlockwiseLinearReader {
|
||||
let idx_within_block = idx % BLOCK_SIZE;
|
||||
let block = &self.blocks[block_id];
|
||||
let interpoled_val: u64 = block.line.eval(idx_within_block);
|
||||
let block_bytes = &self.data[block.data_start_offset..];
|
||||
let block_bytes = block
|
||||
.data
|
||||
.get_or_init(|| block.file_slice.read_bytes().unwrap());
|
||||
let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes);
|
||||
// TODO optimize me! the line parameters could be tweaked to include the multiplication and
|
||||
// remove the dependency.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::io;
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
@@ -190,7 +191,8 @@ impl ColumnCodec for LinearCodec {
|
||||
|
||||
type Estimator = LinearCodecEstimator;
|
||||
|
||||
fn load(mut data: OwnedBytes) -> io::Result<Self::ColumnValues> {
|
||||
fn load(file_slice: FileSlice) -> io::Result<Self::ColumnValues> {
|
||||
let mut data = file_slice.read_bytes()?;
|
||||
let stats = ColumnStats::deserialize(&mut data)?;
|
||||
let linear_params = LinearParams::deserialize(&mut data)?;
|
||||
Ok(LinearReader {
|
||||
|
||||
@@ -8,7 +8,8 @@ use std::io;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use common::file_slice::FileSlice;
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::column_values::monotonic_mapping::{
|
||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||
@@ -60,7 +61,7 @@ pub trait ColumnCodec<T: PartialOrd = u64> {
|
||||
type Estimator: ColumnCodecEstimator + Default;
|
||||
|
||||
/// Loads a column that has been serialized using this codec.
|
||||
fn load(bytes: OwnedBytes) -> io::Result<Self::ColumnValues>;
|
||||
fn load(file_slice: FileSlice) -> io::Result<Self::ColumnValues>;
|
||||
|
||||
/// Returns an estimator.
|
||||
fn estimator() -> Self::Estimator {
|
||||
@@ -111,20 +112,22 @@ impl CodecType {
|
||||
|
||||
fn load<T: MonotonicallyMappableToU64>(
|
||||
&self,
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
) -> io::Result<Arc<dyn ColumnValues<T>>> {
|
||||
match self {
|
||||
CodecType::Bitpacked => load_specific_codec::<BitpackedCodec, T>(bytes),
|
||||
CodecType::Linear => load_specific_codec::<LinearCodec, T>(bytes),
|
||||
CodecType::BlockwiseLinear => load_specific_codec::<BlockwiseLinearCodec, T>(bytes),
|
||||
CodecType::Bitpacked => load_specific_codec::<BitpackedCodec, T>(file_slice),
|
||||
CodecType::Linear => load_specific_codec::<LinearCodec, T>(file_slice),
|
||||
CodecType::BlockwiseLinear => {
|
||||
load_specific_codec::<BlockwiseLinearCodec, T>(file_slice)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn load_specific_codec<C: ColumnCodec, T: MonotonicallyMappableToU64>(
|
||||
bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
) -> io::Result<Arc<dyn ColumnValues<T>>> {
|
||||
let reader = C::load(bytes)?;
|
||||
let reader = C::load(file_slice)?;
|
||||
let reader_typed = monotonic_map_column(
|
||||
reader,
|
||||
StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<T>::new()),
|
||||
@@ -189,25 +192,28 @@ pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
|
||||
///
|
||||
/// This method first identifies the codec off the first byte.
|
||||
pub fn load_u64_based_column_values<T: MonotonicallyMappableToU64>(
|
||||
mut bytes: OwnedBytes,
|
||||
file_slice: FileSlice,
|
||||
) -> io::Result<Arc<dyn ColumnValues<T>>> {
|
||||
let codec_type: CodecType = bytes
|
||||
.first()
|
||||
.copied()
|
||||
let (header, body) = file_slice.split(1);
|
||||
let codec_type: CodecType = header
|
||||
.read_bytes()?
|
||||
.as_slice()
|
||||
.get(0)
|
||||
.cloned()
|
||||
.and_then(CodecType::try_from_code)
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to read codec type"))?;
|
||||
bytes.advance(1);
|
||||
codec_type.load(bytes)
|
||||
codec_type.load(body)
|
||||
}
|
||||
|
||||
/// Helper function to serialize a column (autodetect from all codecs) and then open it
|
||||
#[cfg(test)]
|
||||
pub fn serialize_and_load_u64_based_column_values<T: MonotonicallyMappableToU64>(
|
||||
vals: &dyn Iterable,
|
||||
codec_types: &[CodecType],
|
||||
) -> Arc<dyn ColumnValues<T>> {
|
||||
let mut buffer = Vec::new();
|
||||
serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap();
|
||||
load_u64_based_column_values::<T>(OwnedBytes::new(buffer)).unwrap()
|
||||
load_u64_based_column_values::<T>(FileSlice::from(buffer)).unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use common::HasLen;
|
||||
use proptest::prelude::*;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
|
||||
@@ -12,7 +13,7 @@ fn test_serialize_and_load_simple() {
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(buffer.len(), 7);
|
||||
let col = load_u64_based_column_values::<u64>(OwnedBytes::new(buffer)).unwrap();
|
||||
let col = load_u64_based_column_values::<u64>(FileSlice::from(buffer)).unwrap();
|
||||
assert_eq!(col.num_vals(), 3);
|
||||
assert_eq!(col.get_val(0), 1);
|
||||
assert_eq!(col.get_val(1), 2);
|
||||
@@ -29,7 +30,7 @@ fn test_empty_column_i64() {
|
||||
continue;
|
||||
}
|
||||
num_acceptable_codecs += 1;
|
||||
let col = load_u64_based_column_values::<i64>(OwnedBytes::new(buffer)).unwrap();
|
||||
let col = load_u64_based_column_values::<i64>(FileSlice::from(buffer)).unwrap();
|
||||
assert_eq!(col.num_vals(), 0);
|
||||
assert_eq!(col.min_value(), i64::MIN);
|
||||
assert_eq!(col.max_value(), i64::MIN);
|
||||
@@ -47,7 +48,7 @@ fn test_empty_column_u64() {
|
||||
continue;
|
||||
}
|
||||
num_acceptable_codecs += 1;
|
||||
let col = load_u64_based_column_values::<u64>(OwnedBytes::new(buffer)).unwrap();
|
||||
let col = load_u64_based_column_values::<u64>(FileSlice::from(buffer)).unwrap();
|
||||
assert_eq!(col.num_vals(), 0);
|
||||
assert_eq!(col.min_value(), u64::MIN);
|
||||
assert_eq!(col.max_value(), u64::MIN);
|
||||
@@ -65,7 +66,7 @@ fn test_empty_column_f64() {
|
||||
continue;
|
||||
}
|
||||
num_acceptable_codecs += 1;
|
||||
let col = load_u64_based_column_values::<f64>(OwnedBytes::new(buffer)).unwrap();
|
||||
let col = load_u64_based_column_values::<f64>(FileSlice::from(buffer)).unwrap();
|
||||
assert_eq!(col.num_vals(), 0);
|
||||
// FIXME. f64::MIN would be better!
|
||||
assert!(col.min_value().is_nan());
|
||||
@@ -96,7 +97,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
|
||||
|
||||
let actual_compression = buffer.len() as u64;
|
||||
|
||||
let reader = TColumnCodec::load(OwnedBytes::new(buffer)).unwrap();
|
||||
let reader = TColumnCodec::load(FileSlice::from(buffer)).unwrap();
|
||||
assert_eq!(reader.num_vals(), vals.len() as u32);
|
||||
let mut buffer = Vec::new();
|
||||
for (doc, orig_val) in vals.iter().copied().enumerate() {
|
||||
@@ -325,7 +326,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
&[codec_type],
|
||||
&mut buffer,
|
||||
)?;
|
||||
let buffer = OwnedBytes::new(buffer);
|
||||
let buffer = FileSlice::from(buffer);
|
||||
let column = crate::column_values::load_u64_based_column_values::<i64>(buffer.clone())?;
|
||||
assert_eq!(column.get_val(0), -4000i64);
|
||||
assert_eq!(column.get_val(1), -3000i64);
|
||||
@@ -342,7 +343,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
&[codec_type],
|
||||
&mut buffer_without_gcd,
|
||||
)?;
|
||||
let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd);
|
||||
let buffer_without_gcd = FileSlice::from(buffer_without_gcd);
|
||||
assert!(buffer_without_gcd.len() > buffer.len());
|
||||
|
||||
Ok(())
|
||||
@@ -368,7 +369,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
&[codec_type],
|
||||
&mut buffer,
|
||||
)?;
|
||||
let buffer = OwnedBytes::new(buffer);
|
||||
let buffer = FileSlice::from(buffer);
|
||||
let column = crate::column_values::load_u64_based_column_values::<u64>(buffer.clone())?;
|
||||
assert_eq!(column.get_val(0), 1000u64);
|
||||
assert_eq!(column.get_val(1), 2000u64);
|
||||
@@ -385,7 +386,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
&[codec_type],
|
||||
&mut buffer_without_gcd,
|
||||
)?;
|
||||
let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd);
|
||||
let buffer_without_gcd = FileSlice::from(buffer_without_gcd);
|
||||
assert!(buffer_without_gcd.len() > buffer.len());
|
||||
Ok(())
|
||||
}
|
||||
@@ -404,7 +405,7 @@ fn test_fastfield_gcd_u64() -> io::Result<()> {
|
||||
|
||||
#[test]
|
||||
pub fn test_fastfield2() {
|
||||
let test_fastfield = crate::column_values::serialize_and_load_u64_based_column_values::<u64>(
|
||||
let test_fastfield = serialize_and_load_u64_based_column_values::<u64>(
|
||||
&&[100u64, 200u64, 300u64][..],
|
||||
&ALL_U64_CODEC_TYPES,
|
||||
);
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
|
||||
use common::{ByteCount, DateTime, HasLen};
|
||||
|
||||
use crate::column::{BytesColumn, Column, StrColumn};
|
||||
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
||||
@@ -238,8 +238,7 @@ pub struct DynamicColumnHandle {
|
||||
impl DynamicColumnHandle {
|
||||
// TODO rename load
|
||||
pub fn open(&self) -> io::Result<DynamicColumn> {
|
||||
let column_bytes: OwnedBytes = self.file_slice.read_bytes()?;
|
||||
self.open_internal(column_bytes)
|
||||
self.open_internal(self.file_slice.clone())
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
@@ -258,16 +257,15 @@ impl DynamicColumnHandle {
|
||||
/// If not, the fastfield reader will returns the u64-value associated with the original
|
||||
/// FastValue.
|
||||
pub fn open_u64_lenient(&self) -> io::Result<Option<Column<u64>>> {
|
||||
let column_bytes = self.file_slice.read_bytes()?;
|
||||
match self.column_type {
|
||||
ColumnType::Str | ColumnType::Bytes => {
|
||||
let column: BytesColumn =
|
||||
crate::column::open_column_bytes(column_bytes, self.format_version)?;
|
||||
crate::column::open_column_bytes(self.file_slice.clone(), self.format_version)?;
|
||||
Ok(Some(column.term_ord_column))
|
||||
}
|
||||
ColumnType::IpAddr => {
|
||||
let column = crate::column::open_column_u128_as_compact_u64(
|
||||
column_bytes,
|
||||
self.file_slice.clone(),
|
||||
self.format_version,
|
||||
)?;
|
||||
Ok(Some(column))
|
||||
@@ -277,40 +275,40 @@ impl DynamicColumnHandle {
|
||||
| ColumnType::U64
|
||||
| ColumnType::F64
|
||||
| ColumnType::DateTime => {
|
||||
let column =
|
||||
crate::column::open_column_u64::<u64>(column_bytes, self.format_version)?;
|
||||
let column = crate::column::open_column_u64::<u64>(
|
||||
self.file_slice.clone(),
|
||||
self.format_version,
|
||||
)?;
|
||||
Ok(Some(column))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result<DynamicColumn> {
|
||||
fn open_internal(&self, file_slice: FileSlice) -> io::Result<DynamicColumn> {
|
||||
let dynamic_column: DynamicColumn = match self.column_type {
|
||||
ColumnType::Bytes => {
|
||||
crate::column::open_column_bytes(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_bytes(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::Str => {
|
||||
crate::column::open_column_str(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_str(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::I64 => {
|
||||
crate::column::open_column_u64::<i64>(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_u64::<i64>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::U64 => {
|
||||
crate::column::open_column_u64::<u64>(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_u64::<u64>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::F64 => {
|
||||
crate::column::open_column_u64::<f64>(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_u64::<f64>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::Bool => {
|
||||
crate::column::open_column_u64::<bool>(column_bytes, self.format_version)?.into()
|
||||
crate::column::open_column_u64::<bool>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::IpAddr => {
|
||||
crate::column::open_column_u128::<Ipv6Addr>(column_bytes, self.format_version)?
|
||||
.into()
|
||||
crate::column::open_column_u128::<Ipv6Addr>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
ColumnType::DateTime => {
|
||||
crate::column::open_column_u64::<DateTime>(column_bytes, self.format_version)?
|
||||
.into()
|
||||
crate::column::open_column_u64::<DateTime>(file_slice, self.format_version)?.into()
|
||||
}
|
||||
};
|
||||
Ok(dynamic_column)
|
||||
|
||||
@@ -56,6 +56,33 @@ impl BinarySerializable for VIntU128 {
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct VInt(pub u64);
|
||||
|
||||
impl VInt {
|
||||
pub fn deserialize_with_size<R: Read>(reader: &mut R) -> io::Result<(Self, usize)> {
|
||||
let mut nbytes = 0;
|
||||
let mut bytes = reader.bytes();
|
||||
let mut result = 0u64;
|
||||
let mut shift = 0u64;
|
||||
loop {
|
||||
match bytes.next() {
|
||||
Some(Ok(b)) => {
|
||||
nbytes += 1;
|
||||
result |= u64::from(b % 128u8) << shift;
|
||||
if b >= STOP_BIT {
|
||||
return Ok((VInt(result), nbytes));
|
||||
}
|
||||
shift += 7;
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Reach end of buffer while reading VInt",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const STOP_BIT: u8 = 128;
|
||||
|
||||
#[inline]
|
||||
@@ -221,7 +248,6 @@ impl BinarySerializable for VInt {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{serialize_vint_u32, BinarySerializable, VInt};
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
|
||||
3
runtests.sh
Executable file
3
runtests.sh
Executable file
@@ -0,0 +1,3 @@
|
||||
#! /bin/bash
|
||||
|
||||
cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace
|
||||
@@ -822,7 +822,6 @@ mod tests {
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use test::Bencher;
|
||||
|
||||
@@ -263,7 +263,6 @@ impl SegmentCollector for MultiCollectorChild {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::collector::{Count, TopDocs};
|
||||
use crate::query::TermQuery;
|
||||
|
||||
@@ -40,7 +40,6 @@ impl DocToOpstampMapping<'_> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::DocToOpstampMapping;
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -273,7 +273,6 @@ impl Recorder for TfAndPositionRecorder {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common::write_u32_vint;
|
||||
|
||||
use super::{BufferLender, VInt32Reader};
|
||||
|
||||
@@ -315,7 +315,7 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
reader: &SegmentReader,
|
||||
callback: &mut dyn FnMut(&[DocId]),
|
||||
) -> crate::Result<()> {
|
||||
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
|
||||
let scorer = self.complex_scorer(reader, 1.0, DoNothingCombiner::default)?;
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
|
||||
match scorer {
|
||||
|
||||
@@ -63,7 +63,7 @@ impl TermSetQuery {
|
||||
Ok(BooleanWeight::new(
|
||||
sub_queries,
|
||||
false,
|
||||
Box::new(|| DoNothingCombiner),
|
||||
Box::new(DoNothingCombiner::default),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,6 @@ impl TokenStream for FacetTokenStream<'_> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::FacetTokenizer;
|
||||
use crate::schema::Facet;
|
||||
use crate::tokenizer::{Token, TokenStream, Tokenizer};
|
||||
|
||||
@@ -313,7 +313,6 @@ fn utf8_codepoint_width(b: u8) -> usize {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{utf8_codepoint_width, CodepointFrontiers, NgramTokenizer, StutteringIterator};
|
||||
use crate::tokenizer::tests::assert_token;
|
||||
use crate::tokenizer::{Token, TokenStream, Tokenizer};
|
||||
|
||||
Reference in New Issue
Block a user