Refactoring fast fields codecs.

This removes the GCD part as a codec, and
makes it so that fastfield codecs all share
the same normalization part (shift + gcd).
This commit is contained in:
Paul Masurel
2022-09-04 22:57:24 +09:00
parent ea72cf34d6
commit c632fc014e
18 changed files with 678 additions and 540 deletions

View File

@@ -4,14 +4,17 @@ extern crate test;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use fastfield_codecs::bitpacked::BitpackedCodec;
use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec;
use fastfield_codecs::linear::LinearCodec;
use fastfield_codecs::*;
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55000_u64)
.map(|num| num + rand::random::<u8>() as u64)
.map(|num| num + rng.gen::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);
@@ -22,32 +25,59 @@ mod tests {
data
}
#[inline(never)]
fn value_iter() -> impl Iterator<Item = u64> {
0..20_000
}
fn get_reader_for_bench<Codec: FastFieldCodec>(data: &[u64]) -> Codec::Reader {
let mut bytes = Vec::new();
let col = VecColumn::from(&data);
let normalized_header = fastfield_codecs::NormalizedHeader {
num_vals: col.num_vals(),
max_value: col.max_value(),
};
Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap();
Codec::open_from_bytes(OwnedBytes::new(bytes), normalized_header).unwrap()
}
fn bench_get<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap();
let reader = Codec::open_from_bytes(OwnedBytes::new(bytes)).unwrap();
let col = get_reader_for_bench::<Codec>(data);
b.iter(|| {
let mut sum = 0u64;
for pos in value_iter() {
let val = reader.get_val(pos as u64);
debug_assert_eq!(data[pos as usize], val);
let val = col.get_val(pos as u64);
sum = sum.wrapping_add(val);
}
sum
});
}
#[inline(never)]
fn bench_get_dynamic_helper(b: &mut Bencher, col: Arc<dyn Column>) {
b.iter(|| {
let mut sum = 0u64;
for pos in value_iter() {
let val = col.get_val(pos as u64);
sum = sum.wrapping_add(val);
}
sum
});
}
fn bench_get_dynamic<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let col = Arc::new(get_reader_for_bench::<Codec>(data));
bench_get_dynamic_helper(b, col);
}
fn bench_create<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let mut bytes = Vec::new();
b.iter(|| {
bytes.clear();
Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap();
Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap();
});
}
use ownedbytes::OwnedBytes;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::Bencher;
#[bench]
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
@@ -70,22 +100,28 @@ mod tests {
bench_get::<BitpackedCodec>(b, &data);
}
#[bench]
fn bench_fastfield_bitpack_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<BitpackedCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<LinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<LinearCodec>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<BlockwiseLinearCodec>(b, &data);
}
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
let min_value = data.iter().cloned().min().unwrap_or(0);
let max_value = data.iter().cloned().max().unwrap_or(0);
FastFieldStats {
min_value,
max_value,
num_vals: data.len() as u64,
}
#[bench]
fn bench_fastfield_multilinearinterpol_get_dynamic(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get_dynamic::<BlockwiseLinearCodec>(b, &data);
}
}

View File

@@ -1,9 +1,9 @@
use std::io::{self, Write};
use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType};
/// Depending on the field type, a different
@@ -12,80 +12,25 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType};
pub struct BitpackedReader {
data: OwnedBytes,
bit_unpacker: BitUnpacker,
min_value_u64: u64,
max_value_u64: u64,
num_vals: u64,
normalized_header: NormalizedHeader,
}
impl Column for BitpackedReader {
#[inline]
fn get_val(&self, doc: u64) -> u64 {
self.min_value_u64 + self.bit_unpacker.get(doc, &self.data)
self.bit_unpacker.get(doc, &self.data)
}
#[inline]
fn min_value(&self) -> u64 {
self.min_value_u64
0
}
#[inline]
fn max_value(&self) -> u64 {
self.max_value_u64
self.normalized_header.max_value
}
#[inline]
fn num_vals(&self) -> u64 {
self.num_vals
}
}
pub struct BitpackedSerializerLegacy<'a, W: 'a + Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
num_vals: u64,
amplitude: u64,
num_bits: u8,
}
impl<'a, W: Write> BitpackedSerializerLegacy<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedSerializerLegacy<'a, W>> {
assert!(min_value <= max_value);
let amplitude = max_value - min_value;
let num_bits = compute_num_bits(amplitude);
let bit_packer = BitPacker::new();
Ok(BitpackedSerializerLegacy {
bit_packer,
write,
min_value,
num_vals: 0,
amplitude,
num_bits,
})
}
/// Pushes a new value to the currently open u64 fast field.
#[inline]
pub fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
self.num_vals += 1;
Ok(())
}
pub fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)?;
self.min_value.serialize(&mut self.write)?;
self.amplitude.serialize(&mut self.write)?;
self.num_vals.serialize(&mut self.write)?;
Ok(())
self.normalized_header.num_vals
}
}
@@ -98,50 +43,34 @@ impl FastFieldCodec for BitpackedCodec {
type Reader = BitpackedReader;
/// Opens a fast field given a file.
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
let footer_offset = bytes.len() - 24;
let (data, mut footer) = bytes.split(footer_offset);
let min_value = u64::deserialize(&mut footer)?;
let amplitude = u64::deserialize(&mut footer)?;
let num_vals = u64::deserialize(&mut footer)?;
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
fn open_from_bytes(
data: OwnedBytes,
normalized_header: NormalizedHeader,
) -> io::Result<Self::Reader> {
let num_bits = compute_num_bits(normalized_header.max_value);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedReader {
data,
bit_unpacker,
min_value_u64: min_value,
max_value_u64: max_value,
num_vals,
normalized_header,
})
}
/// Serializes data with the BitpackedFastFieldSerializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
let mut serializer = BitpackedSerializerLegacy::open(
write,
fastfield_accessor.min_value(),
fastfield_accessor.max_value(),
)?;
for val in fastfield_accessor.iter() {
serializer.add_val(val)?;
/// Ideally, we made a shift upstream on the column so that `col.min_value() == 0`.
fn serialize(col: &dyn Column, write: &mut impl Write) -> io::Result<()> {
let num_bits = compute_num_bits(col.max_value());
let mut bit_packer = BitPacker::new();
for val in col.iter() {
bit_packer.write(val, num_bits, write)?;
}
serializer.close_field()?;
bit_packer.close(write)?;
Ok(())
}
fn estimate(fastfield_accessor: &impl Column) -> Option<f32> {
let amplitude = fastfield_accessor.max_value() - fastfield_accessor.min_value();
let num_bits = compute_num_bits(amplitude);
fn estimate(col: &impl Column) -> Option<f32> {
let num_bits = compute_num_bits(col.max_value());
let num_bits_uncompressed = 64;
Some(num_bits as f32 / num_bits_uncompressed as f32)
}

View File

@@ -1,11 +1,12 @@
use std::io;
use std::sync::Arc;
use std::{io, iter};
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::line::Line;
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
const CHUNK_SIZE: usize = 512;
@@ -35,45 +36,6 @@ impl BinarySerializable for Block {
}
}
#[derive(Debug)]
struct BlockwiseLinearParams {
num_vals: u64,
min_value: u64,
max_value: u64,
blocks: Vec<Block>,
}
impl BinarySerializable for BlockwiseLinearParams {
fn serialize<W: io::Write>(&self, wrt: &mut W) -> io::Result<()> {
self.num_vals.serialize(wrt)?;
self.min_value.serialize(wrt)?;
self.max_value.serialize(wrt)?;
let expected_num_blocks = compute_num_blocks(self.num_vals);
assert_eq!(expected_num_blocks, self.blocks.len());
for block in &self.blocks {
block.serialize(wrt)?;
}
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<BlockwiseLinearParams> {
let num_vals = u64::deserialize(reader)?;
let min_value = u64::deserialize(reader)?;
let max_value = u64::deserialize(reader)?;
let num_blocks = compute_num_blocks(num_vals);
let mut blocks = Vec::with_capacity(num_blocks);
for _ in 0..num_blocks {
blocks.push(Block::deserialize(reader)?);
}
Ok(BlockwiseLinearParams {
num_vals,
min_value,
max_value,
blocks,
})
}
}
fn compute_num_blocks(num_vals: u64) -> usize {
(num_vals as usize + CHUNK_SIZE - 1) / CHUNK_SIZE
}
@@ -84,19 +46,27 @@ impl FastFieldCodec for BlockwiseLinearCodec {
const CODEC_TYPE: crate::FastFieldCodecType = FastFieldCodecType::BlockwiseLinear;
type Reader = BlockwiseLinearReader;
fn open_from_bytes(bytes: ownedbytes::OwnedBytes) -> io::Result<Self::Reader> {
fn open_from_bytes(
bytes: ownedbytes::OwnedBytes,
normalized_header: NormalizedHeader,
) -> io::Result<Self::Reader> {
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let footer_offset = bytes.len() - 4 - footer_len as usize;
let (data, mut footer) = bytes.split(footer_offset);
let mut params = BlockwiseLinearParams::deserialize(&mut footer)?;
let num_blocks = compute_num_blocks(normalized_header.num_vals);
let mut blocks: Vec<Block> = iter::repeat_with(|| Block::deserialize(&mut footer))
.take(num_blocks)
.collect::<io::Result<_>>()?;
let mut start_offset = 0;
for block in params.blocks.iter_mut() {
for block in &mut blocks {
block.data_start_offset = start_offset;
start_offset += (block.bit_unpacker.bit_width() as usize) * CHUNK_SIZE / 8;
}
Ok(BlockwiseLinearReader {
params: Arc::new(params),
blocks: Arc::new(blocks),
data,
normalized_header,
})
}
@@ -134,8 +104,8 @@ impl FastFieldCodec for BlockwiseLinearCodec {
}
fn serialize(
wrt: &mut impl io::Write,
fastfield_accessor: &dyn crate::Column,
wrt: &mut impl io::Write,
) -> io::Result<()> {
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let num_vals = fastfield_accessor.num_vals();
@@ -171,20 +141,15 @@ impl FastFieldCodec for BlockwiseLinearCodec {
});
}
let params = BlockwiseLinearParams {
num_vals,
min_value: fastfield_accessor.min_value(),
max_value: fastfield_accessor.max_value(),
blocks,
};
bit_packer.close(wrt)?;
assert_eq!(blocks.len(), compute_num_blocks(num_vals));
let mut counting_wrt = CountingWriter::wrap(wrt);
params.serialize(&mut counting_wrt)?;
for block in &blocks {
block.serialize(&mut counting_wrt)?;
}
let footer_len = counting_wrt.written_bytes();
(footer_len as u32).serialize(&mut counting_wrt)?;
Ok(())
@@ -193,7 +158,8 @@ impl FastFieldCodec for BlockwiseLinearCodec {
#[derive(Clone)]
pub struct BlockwiseLinearReader {
params: Arc<BlockwiseLinearParams>,
blocks: Arc<Vec<Block>>,
normalized_header: NormalizedHeader,
data: OwnedBytes,
}
@@ -202,7 +168,7 @@ impl Column for BlockwiseLinearReader {
fn get_val(&self, idx: u64) -> u64 {
let block_id = (idx / CHUNK_SIZE as u64) as usize;
let idx_within_block = idx % (CHUNK_SIZE as u64);
let block = &self.params.blocks[block_id];
let block = &self.blocks[block_id];
let interpoled_val: u64 = block.line.eval(idx_within_block);
let block_bytes = &self.data[block.data_start_offset..];
let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes);
@@ -210,14 +176,14 @@ impl Column for BlockwiseLinearReader {
}
fn min_value(&self) -> u64 {
self.params.min_value
0u64
}
fn max_value(&self) -> u64 {
self.params.max_value
self.normalized_header.max_value
}
fn num_vals(&self) -> u64 {
self.params.num_vals
self.normalized_header.num_vals
}
}

View File

@@ -1,4 +1,5 @@
use std::marker::PhantomData;
use std::sync::Mutex;
use tantivy_bitpacker::minmax;
@@ -59,6 +60,24 @@ pub struct VecColumn<'a, T = u64> {
max_value: T,
}
impl<'a, C: Column<T>, T: Copy + PartialOrd> Column<T> for &'a C {
fn get_val(&self, idx: u64) -> T {
(*self).get_val(idx)
}
fn min_value(&self) -> T {
(*self).min_value()
}
fn max_value(&self) -> T {
(*self).max_value()
}
fn num_vals(&self) -> u64 {
(*self).num_vals()
}
}
impl<'a, T: Copy + PartialOrd> Column<T> for VecColumn<'a, T> {
fn get_val(&self, position: u64) -> T {
self.values[position as usize]
@@ -142,6 +161,87 @@ where
}
}
pub struct RemappedColumn<T, M, C> {
column: C,
new_to_old_id_mapping: M,
min_max_cache: Mutex<Option<(T, T)>>,
}
impl<T, M, C> RemappedColumn<T, M, C>
where
C: Column<T>,
M: Column<u32>,
T: Copy + Ord + Default,
{
fn min_max(&self) -> (T, T) {
if let Some((min, max)) = self.min_max_cache.lock().unwrap().clone() {
return (min, max);
}
let (min, max) =
tantivy_bitpacker::minmax(self.iter()).unwrap_or((T::default(), T::default()));
*self.min_max_cache.lock().unwrap() = Some((min, max));
(min, max)
}
}
pub struct IterColumn<T>(T);
impl<T> From<T> for IterColumn<T>
where T: Iterator + Clone + ExactSizeIterator
{
fn from(iter: T) -> Self {
IterColumn(iter)
}
}
impl<T> Column<T::Item> for IterColumn<T>
where T: Iterator + Clone + ExactSizeIterator
{
fn get_val(&self, idx: u64) -> T::Item {
self.0.clone().nth(idx as usize).unwrap()
}
fn min_value(&self) -> T::Item {
self.0.clone().next().unwrap()
}
fn max_value(&self) -> T::Item {
self.0.clone().last().unwrap()
}
fn num_vals(&self) -> u64 {
self.0.len() as u64
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T::Item> + 'a> {
Box::new(self.0.clone())
}
}
impl<T, M, C> Column<T> for RemappedColumn<T, M, C>
where
C: Column<T>,
M: Column<u32>,
T: Copy + Ord + Default,
{
fn get_val(&self, idx: u64) -> T {
let old_id = self.new_to_old_id_mapping.get_val(idx);
self.column.get_val(old_id as u64)
}
fn min_value(&self) -> T {
self.min_max().0
}
fn max_value(&self) -> T {
self.min_max().1
}
fn num_vals(&self) -> u64 {
self.new_to_old_id_mapping.num_vals() as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -158,4 +258,11 @@ mod tests {
assert_eq!(mapped.get_val(0), 5);
assert_eq!(mapped.get_val(1), 7);
}
#[test]
fn test_range_as_col() {
let col = IterColumn::from(10..100);
assert_eq!(col.num_vals(), 90);
assert_eq!(col.max_value(), 99);
}
}

View File

@@ -1,36 +1,7 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use common::BinarySerializable;
use fastdivide::DividerU64;
#[derive(Debug, Clone, Copy)]
pub struct GCDParams {
pub gcd: u64,
pub min_value: u64,
pub num_vals: u64,
}
impl BinarySerializable for GCDParams {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.gcd.serialize(writer)?;
self.min_value.serialize(writer)?;
self.num_vals.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let gcd: u64 = u64::deserialize(reader)?;
let min_value: u64 = u64::deserialize(reader)?;
let num_vals: u64 = u64::deserialize(reader)?;
Ok(Self {
gcd,
min_value,
num_vals,
})
}
}
/// Compute the gcd of two non null numbers.
///
/// It is recommended, but not required, to feed values such that `large >= small`.
@@ -85,11 +56,7 @@ mod tests {
) -> io::Result<()> {
let mut vals: Vec<i64> = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::serialize(
VecColumn::from(&vals),
&mut buffer,
&[codec_type, FastFieldCodecType::Gcd],
)?;
crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?;
let buffer = OwnedBytes::new(buffer);
let column = crate::open::<i64>(buffer.clone())?;
assert_eq!(column.get_val(0), -4000i64);
@@ -131,11 +98,7 @@ mod tests {
) -> io::Result<()> {
let mut vals: Vec<u64> = (1..=num_vals).map(|i| i as u64 * 1000u64).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::serialize(
VecColumn::from(&vals),
&mut buffer,
&[codec_type, FastFieldCodecType::Gcd],
)?;
crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?;
let buffer = OwnedBytes::new(buffer);
let column = crate::open::<u64>(buffer.clone())?;
assert_eq!(column.get_val(0), 1000u64);

View File

@@ -23,7 +23,7 @@ mod gcd;
mod serialize;
pub use self::column::{monotonic_map_column, Column, VecColumn};
pub use self::serialize::{open, serialize, serialize_and_load};
pub use self::serialize::{open, serialize, serialize_and_load, NormalizedHeader};
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
#[repr(u8)]
@@ -31,7 +31,6 @@ pub enum FastFieldCodecType {
Bitpacked = 1,
Linear = 2,
BlockwiseLinear = 3,
Gcd = 4,
}
impl BinarySerializable for FastFieldCodecType {
@@ -57,7 +56,6 @@ impl FastFieldCodecType {
1 => Some(Self::Bitpacked),
2 => Some(Self::Linear),
3 => Some(Self::BlockwiseLinear),
4 => Some(Self::Gcd),
_ => None,
}
}
@@ -134,13 +132,13 @@ pub trait FastFieldCodec: 'static {
type Reader: Column<u64> + 'static;
/// Reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader>;
fn open_from_bytes(bytes: OwnedBytes, header: NormalizedHeader) -> io::Result<Self::Reader>;
/// Serializes the data using the serializer into write.
///
/// The fastfield_accessor iterator should be preferred over using fastfield_accessor for
/// The column iterator should be preferred over using column `get_val` method for
/// performance reasons.
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column<u64>) -> io::Result<()>;
fn serialize(column: &dyn Column<u64>, write: &mut impl Write) -> io::Result<()>;
/// Returns an estimate of the compression ratio.
/// If the codec is not applicable, returns `None`.
@@ -149,13 +147,12 @@ pub trait FastFieldCodec: 'static {
///
/// It could make sense to also return a value representing
/// computational complexity.
fn estimate(fastfield_accessor: &impl Column) -> Option<f32>;
fn estimate(column: &impl Column) -> Option<f32>;
}
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 4] = [
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
FastFieldCodecType::Gcd,
FastFieldCodecType::Linear,
];
@@ -176,19 +173,24 @@ mod tests {
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::linear::LinearCodec;
use crate::serialize::Header;
pub fn create_and_validate<Codec: FastFieldCodec>(
pub(crate) fn create_and_validate<Codec: FastFieldCodec>(
data: &[u64],
name: &str,
) -> Option<(f32, f32)> {
let estimation = Codec::estimate(&VecColumn::from(data))?;
let col = &VecColumn::from(data);
let header = Header::compute_header(&col, &[Codec::CODEC_TYPE])?;
let normalized_col = header.normalize_column(col);
let estimation = Codec::estimate(&normalized_col)?;
let mut out: Vec<u8> = Vec::new();
Codec::serialize(&mut out, &VecColumn::from(data)).unwrap();
let mut out = Vec::new();
let col = VecColumn::from(data);
serialize(col, &mut out, &[Codec::CODEC_TYPE]).unwrap();
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);
let reader = Codec::open_from_bytes(OwnedBytes::new(out)).unwrap();
let reader = crate::open::<u64>(OwnedBytes::new(out)).unwrap();
assert_eq!(reader.num_vals(), data.len() as u64);
for (doc, orig_val) in data.iter().copied().enumerate() {
let val = reader.get_val(doc as u64);
@@ -203,24 +205,42 @@ mod tests {
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_small(data in proptest::collection::vec(num_strategy(), 1..10)) {
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
fn test_proptest_small_bitpacked(data in proptest::collection::vec(num_strategy(), 1..10)) {
create_and_validate::<BitpackedCodec>(&data, "proptest bitpacked");
}
#[test]
fn test_proptest_small_linear(data in proptest::collection::vec(num_strategy(), 1..10)) {
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
}
#[test]
fn test_proptest_small_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..10)) {
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
#[test]
fn test_proptest_large(data in proptest::collection::vec(num_strategy(), 1..6000)) {
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
fn test_proptest_large_bitpacked(data in proptest::collection::vec(num_strategy(), 1..6000)) {
create_and_validate::<BitpackedCodec>(&data, "proptest bitpacked");
}
#[test]
fn test_proptest_large_linear(data in proptest::collection::vec(num_strategy(), 1..6000)) {
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
}
#[test]
fn test_proptest_large_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..6000)) {
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
}
}
fn num_strategy() -> impl Strategy<Value = u64> {
prop_oneof![
1 => prop::num::u64::ANY.prop_map(|num| u64::MAX - (num % 10) ),
@@ -307,11 +327,8 @@ mod tests {
#[test]
fn estimation_prefer_bitpacked() {
let data: &[u64] = &[10, 10, 10, 10];
let data: VecColumn = data.into();
let data = VecColumn::from(&[10, 10, 10, 10]);
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
}
@@ -341,7 +358,7 @@ mod tests {
count_codec += 1;
}
}
assert_eq!(count_codec, 4);
assert_eq!(count_codec, 3);
}
}

View File

@@ -1,10 +1,11 @@
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::line::Line;
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType};
/// Depending on the field type, a different
@@ -12,28 +13,31 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType};
#[derive(Clone)]
pub struct LinearReader {
data: OwnedBytes,
footer: LinearParams,
linear_params: LinearParams,
header: NormalizedHeader,
}
impl Column for LinearReader {
#[inline]
fn get_val(&self, doc: u64) -> u64 {
let interpoled_val: u64 = self.footer.line.eval(doc);
let bitpacked_diff = self.footer.bit_unpacker.get(doc, &self.data);
let interpoled_val: u64 = self.linear_params.line.eval(doc);
let bitpacked_diff = self.linear_params.bit_unpacker.get(doc, &self.data);
interpoled_val.wrapping_add(bitpacked_diff)
}
#[inline]
fn min_value(&self) -> u64 {
self.footer.min_value
0u64
}
#[inline]
fn max_value(&self) -> u64 {
self.footer.max_value
self.header.max_value
}
#[inline]
fn num_vals(&self) -> u64 {
self.footer.num_vals
self.header.num_vals
}
}
@@ -43,33 +47,21 @@ pub struct LinearCodec;
#[derive(Debug, Clone)]
struct LinearParams {
num_vals: u64,
min_value: u64,
max_value: u64,
line: Line,
bit_unpacker: BitUnpacker,
}
impl BinarySerializable for LinearParams {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.num_vals.serialize(writer)?;
self.min_value.serialize(writer)?;
self.max_value.serialize(writer)?;
self.line.serialize(writer)?;
self.bit_unpacker.bit_width().serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let num_vals = u64::deserialize(reader)?;
let min_value = u64::deserialize(reader)?;
let max_value = u64::deserialize(reader)?;
let line = Line::deserialize(reader)?;
let bit_width = u8::deserialize(reader)?;
Ok(Self {
num_vals,
min_value,
max_value,
line,
bit_unpacker: BitUnpacker::new(bit_width),
})
@@ -82,16 +74,17 @@ impl FastFieldCodec for LinearCodec {
type Reader = LinearReader;
/// Opens a fast field given a file.
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let footer_offset = bytes.len() - 4 - footer_len as usize;
let (data, mut footer) = bytes.split(footer_offset);
let footer = LinearParams::deserialize(&mut footer)?;
Ok(LinearReader { data, footer })
fn open_from_bytes(mut data: OwnedBytes, header: NormalizedHeader) -> io::Result<Self::Reader> {
let linear_params = LinearParams::deserialize(&mut data)?;
Ok(LinearReader {
data,
linear_params,
header,
})
}
/// Creates a new fast field serializer.
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
fn serialize(fastfield_accessor: &dyn Column, write: &mut impl Write) -> io::Result<()> {
assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value());
let line = Line::train(fastfield_accessor);
@@ -106,6 +99,12 @@ impl FastFieldCodec for LinearCodec {
.unwrap();
let num_bits = compute_num_bits(max_offset_from_line);
let linear_params = LinearParams {
line,
bit_unpacker: BitUnpacker::new(num_bits),
};
linear_params.serialize(write)?;
let mut bit_packer = BitPacker::new();
for (pos, actual_value) in fastfield_accessor.iter().enumerate() {
let calculated_value = line.eval(pos as u64);
@@ -114,19 +113,6 @@ impl FastFieldCodec for LinearCodec {
}
bit_packer.close(write)?;
let footer = LinearParams {
num_vals: fastfield_accessor.num_vals(),
min_value: fastfield_accessor.min_value(),
max_value: fastfield_accessor.max_value(),
line,
bit_unpacker: BitUnpacker::new(num_bits),
};
let mut counting_wrt = CountingWriter::wrap(write);
footer.serialize(&mut counting_wrt)?;
let footer_len = counting_wrt.written_bytes();
(footer_len as u32).serialize(&mut counting_wrt)?;
Ok(())
}
@@ -225,7 +211,6 @@ mod tests {
#[test]
fn linear_interpol_fast_field_test_simple() {
let data = (10..=20_u64).collect::<Vec<_>>();
create_and_validate(&data, "simple monotonically");
}

View File

@@ -113,7 +113,7 @@ pub fn serialize_with_codec<C: FastFieldCodec>(
let data = Data(data);
let estimation = C::estimate(&data)?;
let mut out = Vec::new();
C::serialize(&mut out, &data).unwrap();
C::serialize(&data, &mut out).unwrap();
let actual_compression = out.len() as f32 / (data.num_vals() * 8) as f32;
Some((estimation, actual_compression, C::CODEC_TYPE))
}

View File

@@ -21,14 +21,13 @@ use std::io;
use std::num::NonZeroU64;
use std::sync::Arc;
use common::BinarySerializable;
use common::{BinarySerializable, VInt};
use fastdivide::DividerU64;
use log::warn;
use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::gcd::{find_gcd, GCDParams};
use crate::linear::LinearCodec;
use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
@@ -46,125 +45,149 @@ fn codec_estimation<C: FastFieldCodec, D: Column>(
}
}
fn write_header<W: io::Write>(codec_type: FastFieldCodecType, output: &mut W) -> io::Result<()> {
codec_type.to_code().serialize(output)?;
Ok(())
#[derive(Debug, Copy, Clone)]
pub struct NormalizedHeader {
pub num_vals: u64,
pub max_value: u64,
}
fn gcd_params(column: &impl Column<u64>) -> Option<GCDParams> {
let min_value = column.min_value();
let gcd = find_gcd(column.iter().map(|val| val - min_value)).map(NonZeroU64::get)?;
if gcd == 1 {
return None;
#[derive(Debug, Copy, Clone)]
pub(crate) struct Header {
num_vals: u64,
min_value: u64,
max_value: u64,
gcd: Option<NonZeroU64>,
codec_type: FastFieldCodecType,
}
impl Header {
pub fn normalized(self) -> NormalizedHeader {
let max_value =
(self.max_value - self.min_value) / self.gcd.map(|gcd| gcd.get()).unwrap_or(1);
NormalizedHeader {
num_vals: self.num_vals,
max_value,
}
}
pub fn normalize_column<C: Column>(&self, from_column: C) -> impl Column {
let min_value = self.min_value;
let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1);
let divider = DividerU64::divide_by(gcd);
monotonic_map_column(from_column, move |val| divider.divide(val - min_value))
}
pub fn compute_header(
column: impl Column<u64>,
codecs: &[FastFieldCodecType],
) -> Option<Header> {
let num_vals = column.num_vals();
let min_value = column.min_value();
let max_value = column.max_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let codec_type = detect_codec(shifted_column, codecs)?;
Some(Header {
num_vals,
min_value,
max_value,
gcd,
codec_type,
})
}
}
impl BinarySerializable for Header {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
VInt(self.num_vals).serialize(writer)?;
VInt(self.min_value).serialize(writer)?;
VInt(self.max_value - self.min_value).serialize(writer)?;
if let Some(gcd) = self.gcd {
VInt(gcd.get()).serialize(writer)?;
} else {
VInt(0u64).serialize(writer)?;
}
self.codec_type.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let num_vals = VInt::deserialize(reader)?.0;
let min_value = VInt::deserialize(reader)?.0;
let amplitude = VInt::deserialize(reader)?.0;
let max_value = min_value + amplitude;
let gcd_u64 = VInt::deserialize(reader)?.0;
let codec_type = FastFieldCodecType::deserialize(reader)?;
Ok(Header {
num_vals,
min_value,
max_value,
gcd: NonZeroU64::new(gcd_u64),
codec_type,
})
}
Some(GCDParams {
gcd,
min_value,
num_vals: column.num_vals(),
})
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
open_from_id(bytes, codec_type)
}
fn open_codec_from_bytes<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let reader = C::open_from_bytes(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
pub fn open_gcd_from_bytes<WrappedCodec: FastFieldCodec>(
bytes: OwnedBytes,
) -> io::Result<impl Column> {
let footer_offset = bytes.len() - 24;
let (body, mut footer) = bytes.split(footer_offset);
let gcd_params = GCDParams::deserialize(&mut footer)?;
let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val;
let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?;
Ok(monotonic_map_column(reader, gcd_remap))
}
fn open_codec_with_gcd<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let reader = open_gcd_from_bytes::<C>(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
fn open_from_id<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
codec_type: FastFieldCodecType,
) -> io::Result<Arc<dyn Column<T>>> {
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_from_bytes::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_from_bytes::<LinearCodec, _>(bytes),
let header = Header::deserialize(&mut bytes)?;
match header.codec_type {
FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header),
FastFieldCodecType::Linear => open_specific_codec::<LinearCodec, _>(bytes, &header),
FastFieldCodecType::BlockwiseLinear => {
open_codec_from_bytes::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_with_gcd::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_with_gcd::<LinearCodec, _>(bytes),
FastFieldCodecType::BlockwiseLinear => {
open_codec_with_gcd::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Gcd codec wrapped into another gcd codec. This combination is not allowed.",
)),
}
open_specific_codec::<BlockwiseLinearCodec, _>(bytes, &header)
}
}
}
fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
header: &Header,
) -> io::Result<Arc<dyn Column<Item>>> {
let normalized_header = header.normalized();
let reader = C::open_from_bytes(bytes, normalized_header)?;
let min_value = header.min_value;
if let Some(gcd) = header.gcd {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get());
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
} else {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val);
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
}
}
pub fn serialize<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
output: &mut impl io::Write,
codecs: &[FastFieldCodecType],
) -> io::Result<()> {
let column = monotonic_map_column(typed_column, T::to_u64);
let gcd_params_opt = if codecs.contains(&FastFieldCodecType::Gcd) {
gcd_params(&column)
} else {
None
};
let gcd_params = if let Some(gcd_params) = gcd_params_opt {
gcd_params
} else {
return serialize_without_gcd(column, output, codecs);
};
write_header(FastFieldCodecType::Gcd, output)?;
let base_value = column.min_value();
let gcd_divider = DividerU64::divide_by(gcd_params.gcd);
let divided_fastfield_accessor =
monotonic_map_column(column, |val: u64| gcd_divider.divide(val - base_value));
serialize_without_gcd(divided_fastfield_accessor, output, codecs)?;
gcd_params.serialize(output)?;
let header = Header::compute_header(&column, codecs).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Data cannot be serialized with this list of codec. {:?}",
codecs
),
)
})?;
header.serialize(output)?;
let normalized_column = header.normalize_column(column);
assert_eq!(normalized_column.min_value(), 0u64);
serialize_given_codec(normalized_column, header.codec_type, output)?;
Ok(())
}
fn serialize_without_gcd(
fn detect_codec(
column: impl Column<u64>,
output: &mut impl io::Write,
codecs: &[FastFieldCodecType],
) -> io::Result<()> {
) -> Option<FastFieldCodecType> {
let mut estimations = Vec::new();
for &codec in codecs {
if codec == FastFieldCodecType::Gcd {
continue;
}
match codec {
FastFieldCodecType::Bitpacked => {
codec_estimation::<BitpackedCodec, _>(&column, &mut estimations);
@@ -175,7 +198,6 @@ fn serialize_without_gcd(
FastFieldCodecType::BlockwiseLinear => {
codec_estimation::<BlockwiseLinearCodec, _>(&column, &mut estimations);
}
FastFieldCodecType::Gcd => {}
}
}
if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) {
@@ -187,25 +209,25 @@ fn serialize_without_gcd(
// removing nan values for codecs with broken calculations, and max values which disables
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, codec_type) = estimations[0];
estimations
.sort_by(|(score_left, _), (score_right, _)| score_left.partial_cmp(&score_right).unwrap());
Some(estimations.first()?.1)
}
write_header(codec_type, output)?;
fn serialize_given_codec(
column: impl Column<u64>,
codec_type: FastFieldCodecType,
output: &mut impl io::Write,
) -> io::Result<()> {
match codec_type {
FastFieldCodecType::Bitpacked => {
BitpackedCodec::serialize(output, &column)?;
BitpackedCodec::serialize(&column, output)?;
}
FastFieldCodecType::Linear => {
LinearCodec::serialize(output, &column)?;
LinearCodec::serialize(&column, output)?;
}
FastFieldCodecType::BlockwiseLinear => {
BlockwiseLinearCodec::serialize(output, &column)?;
}
FastFieldCodecType::Gcd => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"GCD codec not supported.",
));
BlockwiseLinearCodec::serialize(&column, output)?;
}
}
output.flush()?;
@@ -230,4 +252,32 @@ mod tests {
let restored: Vec<u64> = serialize_and_load(&original[..]).iter().collect();
assert_eq!(&restored, &original[..]);
}
#[test]
fn test_fastfield_bool_size_bitwidth_1() {
let mut buffer = Vec::new();
let col = VecColumn::from(&[false, true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 1 byte of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 8);
}
#[test]
fn test_fastfield_bool_bit_size_bitwidth_0() {
let mut buffer = Vec::new();
let col = VecColumn::from(&[true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 0 bytes of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 7);
}
#[test]
fn test_fastfield_gcd() {
let mut buffer = Vec::new();
let vals: Vec<u64> = (0..80).map(|val| (val % 7) * 1_000u64).collect();
let col = VecColumn::from(&vals[..]);
serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap();
// Values are stored over 3 bits.
assert_eq!(buffer.len(), 7 + (3 * 80 / 8) + 7);
}
}

View File

@@ -1,6 +1,9 @@
use std::io;
use fastfield_codecs::VecColumn;
use crate::fastfield::serializer::CompositeFastFieldSerializer;
use crate::fastfield::MultivalueStartIndex;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::schema::{Document, Field, Value};
use crate::DocId;
@@ -104,20 +107,26 @@ impl BytesFastFieldWriter {
/// Serializes the fast field values by pushing them to the `FastFieldSerializer`.
pub fn serialize(
&self,
mut self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
// writing the offset index
let mut doc_index_serializer =
serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?;
let mut offset = 0;
for vals in self.get_ordered_values(doc_id_map) {
doc_index_serializer.add_val(offset)?;
offset += vals.len() as u64;
// TODO FIXME No need to double the memory.
{
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
if let Some(doc_id_map) = doc_id_map {
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
serializer.create_auto_detect_u64_fast_field_with_idx(
self.field,
multi_value_start_index,
0,
)?;
} else {
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?;
}
}
doc_index_serializer.add_val(self.vals.len() as u64)?;
doc_index_serializer.close_field()?;
// writing the values themselves
let mut value_serializer = serializer.new_bytes_fast_field_with_idx(self.field, 1);
// the else could be removed, but this is faster (difference not benchmarked)

View File

@@ -26,6 +26,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::multivalued::MultivalueStartIndex;
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
@@ -197,19 +198,18 @@ mod tests {
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use common::HasLen;
use fastfield_codecs::{open, FastFieldCodecType};
use once_cell::sync::Lazy;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use rand::SeedableRng;
use rand::{Rng, SeedableRng};
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::merge_policy::NoMergePolicy;
use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT};
use crate::schema::{Document, Field, Schema, SchemaBuilder, FAST, STRING, TEXT};
use crate::time::OffsetDateTime;
use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader};
@@ -251,7 +251,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 45);
assert_eq!(file.len(), 25);
let composite_file = CompositeFile::open(&file)?;
let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?;
let fast_field_reader = open::<u64>(fast_field_bytes)?;
@@ -282,7 +282,7 @@ mod tests {
serializer.close()?;
}
let file = directory.open_read(path)?;
assert_eq!(file.len(), 70);
assert_eq!(file.len(), 53);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite
@@ -321,7 +321,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 43);
assert_eq!(file.len(), 26);
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite
@@ -356,7 +356,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 80051);
assert_eq!(file.len(), 80040);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite
@@ -398,12 +398,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 17710 as usize); //bitpacked size
// assert_eq!(file.len(), 10175_usize); // linear interpol size
// assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement
// assert_eq!(file.len(), 1325_usize); // linear interpol size after switching to int based
assert_eq!(file.len(), 62_usize); // linear interpol size after switching to int based, off
// by one fix
assert_eq!(file.len(), 40_usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
@@ -843,7 +838,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 44);
assert_eq!(file.len(), 24);
let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?;
@@ -879,7 +874,7 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 56);
assert_eq!(file.len(), 36);
let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?;
@@ -897,24 +892,21 @@ mod tests {
let directory: RamDirectory = RamDirectory::create();
let mut schema_builder = Schema::builder();
schema_builder.add_bool_field("field_bool", FAST);
let field = schema_builder.add_bool_field("field_bool", FAST);
let schema = schema_builder.build();
let field = schema.get_field("field_bool").unwrap();
{
let write: WritePtr = directory.open_write(path).unwrap();
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
let mut serializer = CompositeFastFieldSerializer::from_write(write)?;
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema);
let doc = Document::default();
fast_field_writers.add_document(&doc);
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?;
serializer.close()?;
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 43);
let composite_file = CompositeFile::open(&file)?;
assert_eq!(file.len(), 23);
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open::<bool>(data)?;
assert_eq!(fast_field_reader.get_val(0), false);
@@ -948,15 +940,10 @@ mod tests {
pub fn test_gcd_date() -> crate::Result<()> {
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?;
assert_eq!(size_prec_sec, 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours);
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?;
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
assert_eq!(size_prec_micro, 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours);
Ok(())
}
@@ -964,40 +951,26 @@ mod tests {
codec_type: FastFieldCodecType,
precision: DatePrecision,
) -> crate::Result<usize> {
let time1 = DateTime::from_timestamp_micros(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let time2 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_micros(4111))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let time3 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_millis(2000))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let mut schema_builder = Schema::builder();
let mut rng = StdRng::seed_from_u64(2u64);
const T0: i64 = 1_662_345_825_012_529i64;
const ONE_HOUR_IN_MICROSECS: i64 = 3_600 * 1_000_000;
let times: Vec<DateTime> = std::iter::repeat_with(|| {
// +- One hour.
let t = T0 + rng.gen_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS);
DateTime::from_timestamp_micros(t)
})
.take(1_000)
.collect();
let date_options = DateOptions::default()
.set_fast(Cardinality::SingleValue)
.set_precision(precision);
let mut schema_builder = SchemaBuilder::default();
let field = schema_builder.add_date_field("field", date_options);
let schema = schema_builder.build();
let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)];
let docs: Vec<Document> = times.iter().map(|time| doc!(field=>*time)).collect();
let directory = get_index(&docs, &schema, &[codec_type])?;
let directory = get_index(&docs[..], &schema, &[codec_type])?;
let path = Path::new("test");
let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?;
@@ -1005,9 +978,9 @@ mod tests {
let len = file.len();
let test_fastfield = open::<DateTime>(file.read_bytes()?)?;
assert_eq!(test_fastfield.get_val(0), time1.truncate(precision));
assert_eq!(test_fastfield.get_val(1), time2.truncate(precision));
assert_eq!(test_fastfield.get_val(2), time3.truncate(precision));
for (i, time) in times.iter().enumerate() {
assert_eq!(test_fastfield.get_val(i as u64), time.truncate(precision));
}
Ok(len)
}
}

View File

@@ -3,6 +3,7 @@ mod writer;
pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub(crate) use self::writer::MultivalueStartIndex;
#[cfg(test)]
mod tests {

View File

@@ -1,10 +1,9 @@
use std::io;
use std::sync::Mutex;
use fastfield_codecs::MonotonicallyMappableToU64;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;
use tantivy_bitpacker::minmax;
use crate::fastfield::serializer::BitpackedSerializerLegacy;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
@@ -102,16 +101,6 @@ impl MultiValuedFastFieldWriter {
}
}
/// Register all of the values associated to a document.
///
/// The method returns the `DocId` of the document that was
/// just written.
pub fn add_document_vals(&mut self, vals: &[UnorderedTermId]) -> DocId {
let doc = self.doc_index.len() as DocId;
self.next_doc();
self.vals.extend_from_slice(vals);
doc
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
@@ -151,39 +140,34 @@ impl MultiValuedFastFieldWriter {
/// `tantivy` builds a mapping to convert this `UnorderedTermId` into
/// term ordinals.
pub fn serialize(
&self,
mut self,
serializer: &mut CompositeFastFieldSerializer,
mapping_opt: Option<&FnvHashMap<UnorderedTermId, TermOrdinal>>,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
{
// writing the offset index
let mut doc_index_serializer =
serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?;
let mut offset = 0;
for vals in self.get_ordered_values(doc_id_map) {
doc_index_serializer.add_val(offset)?;
offset += vals.len() as u64;
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
if let Some(doc_id_map) = doc_id_map {
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
serializer.create_auto_detect_u64_fast_field_with_idx(
self.field,
multi_value_start_index,
0,
)?;
} else {
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?;
}
doc_index_serializer.add_val(self.vals.len() as u64)?;
doc_index_serializer.close_field()?;
}
{
// writing the values themselves.
let mut value_serializer: BitpackedSerializerLegacy<'_, _>;
// Writing the values themselves.
// TODO FIXME: Use less memory.
let mut values: Vec<u64> = Vec::new();
if let Some(mapping) = mapping_opt {
value_serializer = serializer.new_u64_fast_field_with_idx(
self.field,
0u64,
mapping.len() as u64,
1,
)?;
if self.fast_field_type.is_facet() {
let mut doc_vals: Vec<u64> = Vec::with_capacity(100);
for vals in self.get_ordered_values(doc_id_map) {
// In the case of facets, we want a vec of facet ord that is sorted.
doc_vals.clear();
let remapped_vals = vals
.iter()
@@ -191,7 +175,7 @@ impl MultiValuedFastFieldWriter {
doc_vals.extend(remapped_vals);
doc_vals.sort_unstable();
for &val in &doc_vals {
value_serializer.add_val(val)?;
values.push(val);
}
}
} else {
@@ -200,24 +184,172 @@ impl MultiValuedFastFieldWriter {
.iter()
.map(|val| *mapping.get(val).expect("Missing term ordinal"));
for val in remapped_vals {
value_serializer.add_val(val)?;
values.push(val);
}
}
}
} else {
let val_min_max = minmax(self.vals.iter().cloned());
let (val_min, val_max) = val_min_max.unwrap_or((0u64, 0u64));
value_serializer =
serializer.new_u64_fast_field_with_idx(self.field, val_min, val_max, 1)?;
for vals in self.get_ordered_values(doc_id_map) {
// sort values in case of remapped doc_ids?
for &val in vals {
value_serializer.add_val(val)?;
values.push(val);
}
}
}
value_serializer.close_field()?;
let col = VecColumn::from(&values[..]);
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 1)?;
}
Ok(())
}
}
pub(crate) struct MultivalueStartIndex<'a, C: Column> {
column: &'a C,
doc_id_map: &'a DocIdMapping,
min_max_opt: Mutex<Option<(u64, u64)>>,
random_seeker: Mutex<MultivalueStartIndexRandomSeeker<'a, C>>,
}
struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1);
MultivalueStartIndex {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}),
}
}
fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = self.min_max_opt.lock().unwrap().clone() {
return (min, max);
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
*self.min_max_opt.lock().unwrap() = Some((min, max));
(min, max)
}
}
impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock = MultivalueStartIndexRandomSeeker {
seek_head: MultivalueStartIndexIter {
column: self.column,
doc_id_map: self.doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
};
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
random_seeker_lock.seek_head.nth(to_skip as usize).unwrap()
}
fn min_value(&self) -> u64 {
self.minmax().0
}
fn max_value(&self) -> u64 {
self.minmax().1
}
fn num_vals(&self) -> u64 {
(self.doc_id_map.num_new_doc_ids() + 1) as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter {
column: &self.column,
doc_id_map: self.doc_id_map,
new_doc_id: 0,
offset: 0,
})
}
}
struct MultivalueStartIndexIter<'a, C: Column> {
pub column: &'a C,
pub doc_id_map: &'a DocIdMapping,
pub new_doc_id: usize,
pub offset: u64,
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.new_doc_id > self.doc_id_map.num_new_doc_ids() {
return None;
}
let new_doc_id = self.new_doc_id;
self.new_doc_id += 1;
let start_offset = self.offset;
if new_doc_id < self.doc_id_map.num_new_doc_ids() {
let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64;
let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc);
self.offset += num_vals_for_doc;
}
Some(start_offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multivalue_start_index() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 5);
let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col, // 3, 2, 5, 2, 4
&doc_id_mapping,
);
assert_eq!(multivalue_start_index.num_vals(), 4);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 4, 6, 11]
); // 4, 2, 5
}
#[test]
fn test_multivalue_get_vals() {
let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(doc_id_mapping.num_old_doc_ids(), 10);
let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55,][..]);
let multivalue_start_index = MultivalueStartIndex::new(
&col,
&doc_id_mapping,
);
assert_eq!(
multivalue_start_index.iter().collect::<Vec<u64>>(),
vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
);
assert_eq!(multivalue_start_index.num_vals(), 11);
assert_eq!(multivalue_start_index.get_val(3), 2);
assert_eq!(multivalue_start_index.get_val(5), 5);
assert_eq!(multivalue_start_index.get_val(8), 21);
assert_eq!(multivalue_start_index.get_val(4), 3);
assert_eq!(multivalue_start_index.get_val(0), 0);
assert_eq!(multivalue_start_index.get_val(10), 55);
}
}

View File

@@ -1,7 +1,7 @@
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter};
pub use fastfield_codecs::bitpacked::{BitpackedCodec, BitpackedSerializerLegacy};
pub use fastfield_codecs::bitpacked::BitpackedCodec;
pub use fastfield_codecs::{Column, FastFieldCodec, FastFieldStats};
use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES};
@@ -85,40 +85,6 @@ impl CompositeFastFieldSerializer {
Ok(())
}
/// Start serializing a new u64 fast field
pub fn serialize_into(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedSerializerLegacy<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedSerializerLegacy<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field_with_idx(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
idx: usize,
) -> io::Result<BitpackedSerializerLegacy<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
// Prepend codec id to field data for compatibility with DynamicFastFieldReader.
FastFieldCodecType::Bitpacked.serialize(field_write)?;
BitpackedSerializerLegacy::open(field_write, min_value, max_value)
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
&mut self,

View File

@@ -211,12 +211,12 @@ impl FastFieldsWriter {
/// Serializes all of the `FastFieldWriter`s by pushing them in
/// order to the fast field serializer.
pub fn serialize(
&self,
self,
serializer: &mut CompositeFastFieldSerializer,
mapping: &HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
for field_writer in &self.term_id_writers {
for field_writer in self.term_id_writers {
let field = field_writer.field();
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
}
@@ -224,11 +224,11 @@ impl FastFieldsWriter {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in &self.multi_values_writers {
for field_writer in self.multi_values_writers {
let field = field_writer.field();
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
}
for field_writer in &self.bytes_value_writers {
for field_writer in self.bytes_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
Ok(())

View File

@@ -91,6 +91,12 @@ impl DocIdMapping {
.map(|old_doc| els[*old_doc as usize])
.collect()
}
pub fn num_new_doc_ids(&self) -> usize {
self.new_doc_id_to_old.len()
}
pub fn num_old_doc_ids(&self) -> usize {
self.old_doc_id_to_new.len()
}
}
pub(crate) fn expect_field_id_for_sort_field(

View File

@@ -612,25 +612,23 @@ impl IndexMerger {
.collect::<Vec<_>>();
// We can now write the actual fast field values.
// In the case of hierarchical facets, they are actually term ordinals.
let max_term_ord = term_ordinal_mappings.max_term_ord();
{
let mut serialize_vals =
fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?;
let mut vals: Vec<u64> = Vec::with_capacity(100);
let mut vals = Vec::new();
let mut buffer = Vec::new();
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(old_doc_addr.segment_ord as usize);
let ff_reader = &fast_field_reader[old_doc_addr.segment_ord as usize];
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
for &prev_term_ord in &vals {
ff_reader.get_vals(old_doc_addr.doc_id, &mut buffer);
for &prev_term_ord in &buffer {
let new_term_ord = term_ordinal_mapping[prev_term_ord as usize];
serialize_vals.add_val(new_term_ord)?;
vals.push(new_term_ord);
}
}
serialize_vals.close_field()?;
let col = VecColumn::from(&vals[..]);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(field, col, 1)?;
}
Ok(())
}

View File

@@ -138,7 +138,7 @@ impl SegmentWriter {
remap_and_write(
&self.per_field_postings_writers,
self.ctx,
&self.fast_field_writers,
self.fast_field_writers,
&self.fieldnorms_writer,
&self.schema,
self.segment_serializer,
@@ -345,7 +345,7 @@ impl SegmentWriter {
fn remap_and_write(
per_field_postings_writers: &PerFieldPostingsWriter,
ctx: IndexingContext,
fast_field_writers: &FastFieldsWriter,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
schema: &Schema,
mut serializer: SegmentSerializer,