Compare commits

...

45 Commits

Author SHA1 Message Date
Evance Soumaoro
4c454d752d exposing min/max value interface on MultiValuedFastField Reader 2021-06-23 08:32:51 +00:00
PSeitz
f05e84f964 add FieldEntry constructor, closes #1086 (#1090) 2021-06-17 10:15:48 +09:00
PSeitz
65546ed22b Merge pull request #1088 from appaquet/fix/store-reader-iterator-take2
Fix corruption in store reader iterator, take 2
2021-06-16 14:44:00 +02:00
Andre-Philippe Paquet
57ae5b27dc fix store reader iterator, take 2 2021-06-16 07:51:39 -04:00
PSeitz
f9531ec3c9 Merge pull request #1085 from PSeitz/fastfieldcompression
use concrete return type, fixes #1084
2021-06-16 13:07:12 +02:00
Pascal Seitz
5b54a32563 use concrete return type, fixes #1084 2021-06-16 12:03:11 +02:00
PSeitz
cd049e28bc Merge pull request #1082 from PSeitz/fastfieldcompression
use dynamic fast field codec for offset index
2021-06-16 11:59:00 +02:00
PSeitz
646e41bec4 Merge pull request #1083 from PSeitz/termdict_block_layout
Move counting writer to common
2021-06-16 08:57:55 +02:00
Pascal Seitz
36528c5e83 move counting writer to common
move counting writer to common
reuse counting writer in fastfield codec
2021-06-16 08:14:04 +02:00
Pascal Seitz
cd169dee23 use dynamic fast field codec for offset index 2021-06-15 13:34:42 +02:00
PSeitz
b5cc60f80b Merge pull request #1080 from PSeitz/more_tests
test all features in github actions
2021-06-15 10:51:57 +02:00
Pascal Seitz
060b83159a use nightly for tests 2021-06-15 10:08:49 +02:00
Pascal Seitz
a40ff35453 test all features 2021-06-15 09:31:39 +02:00
PSeitz
268e6bfe6e update fast field codec readme 2021-06-15 09:19:39 +02:00
PSeitz
f902440b8b Merge pull request #1072 from PSeitz/fastfieldcompression
Enable support for multiple fastfield codecs, add linear interpolation
2021-06-15 09:07:17 +02:00
Pascal Seitz
77a0902605 replace unwrap, use vec in bench 2021-06-14 17:01:46 +02:00
Pascal Seitz
c889ae10e4 add is_applicable to fast field codecs 2021-06-14 16:16:25 +02:00
Pascal Seitz
0a534c6ee0 rename create to serialize 2021-06-14 15:40:07 +02:00
Pascal Seitz
167d88b449 fix tests behind unstable feature flag 2021-06-14 15:31:12 +02:00
Pascal Seitz
1071ed84f2 fix cond compilation 2021-06-14 14:05:04 +02:00
Pascal Seitz
abb5624af2 add contributing guidelines, add codec comparer binary
add contributing guidelines
add codec comparer binary to test codec compressions with different test data sets
2021-06-14 13:56:40 +02:00
Pascal Seitz
1d41b96d32 rename, add codec_tester 2021-06-14 13:56:40 +02:00
Pascal Seitz
ef4665945f rename file 2021-06-14 13:56:40 +02:00
Pascal Seitz
294cd5fd0b streamline traits and tests 2021-06-14 13:56:40 +02:00
Pascal Seitz
f4d271177c add inline, add readme 2021-06-14 13:56:40 +02:00
Pascal Seitz
451538fecf add serialize for bool 2021-06-14 13:56:40 +02:00
Pascal Seitz
e78e0fec59 add multilinearinterpolation
add multilinearinterpolation, which compresses blocks of size 512
add checks for linear interpolation
2021-06-14 13:56:40 +02:00
Pascal Seitz
2e639cebf8 fix bitpacker bug, reset internal value 2021-06-14 13:56:40 +02:00
Pascal Seitz
e296da7ade add debug and failsafes 2021-06-14 13:56:40 +02:00
Pascal Seitz
3b3e26c4b8 use f64 precision for slope calculation 2021-06-14 13:56:40 +02:00
Pascal Seitz
6a4883ac69 use uniform distribution sampling 2021-06-14 13:56:40 +02:00
Pascal Seitz
0ba05df545 add f32::MAX to disable a compressor 2021-06-14 13:56:40 +02:00
Pascal Seitz
aa3c4d4029 use f32 precision, add inline 2021-06-14 13:56:40 +02:00
Pascal Seitz
60df629725 cargo.toml license desc and author 2021-06-14 13:56:40 +02:00
Pascal Seitz
2570b005ac fix estimation test 2021-06-14 13:56:40 +02:00
Pascal Seitz
d5212cd19d fix clippy 2021-06-14 13:56:40 +02:00
Pascal Seitz
2193d85622 fix clippy and common crate tests 2021-06-14 13:56:40 +02:00
Pascal Seitz
dfdbfe9eff add benchmark for fast field codecs
test tests::bench_fastfield_bitpack_create        ... bench:      57,628 ns/iter (+/- 23,486)
test tests::bench_fastfield_bitpack_get           ... bench:      43,323 ns/iter (+/- 4,286)
test tests::bench_fastfield_linearinterpol_create ... bench:     223,625 ns/iter (+/- 33,563)
test tests::bench_fastfield_linearinterpol_get    ... bench:      82,839 ns/iter (+/- 9,575)
2021-06-14 13:56:40 +02:00
Pascal Seitz
b999e836b2 replace BitpackedFastFieldReader, delete FastFieldSerializer trait 2021-06-14 13:56:40 +02:00
Pascal Seitz
be2dd41e69 add interface to create and read codecs
add CodecReader as common interface in fastfield codec crate
add LinearInterpolation to DynamicFastFieldReader
calc estimation and choose best codec
cleanup
2021-06-14 13:56:40 +02:00
Pascal Seitz
483fdb79cc add linear interpolation estimation
add estimation tests
add codec test data in tests
2021-06-14 13:56:40 +02:00
Pascal Seitz
aefd0fc907 refactor, add fastfield metadata to footer
change api to fastfield reader in codec crate
add fastfield metadata to footer
remove old code
merge codec files
2021-06-14 13:56:40 +02:00
Pascal Seitz
3298d6cb71 move common to common crate, create fastfield_codecs crate
move common to common crate
create fastfield_codecs crate
add bitpacker to fast field codecs
add linear interpolation to fast field codecs
add tests
2021-06-14 13:56:40 +02:00
Pascal Seitz
c02c78ea73 implement linear interpol serializer 2021-06-14 13:56:40 +02:00
Pascal Seitz
6bf4fee1ba support multiple codecs
support multiple codes
prepend codec id to all fast fields
add new api to create fastfields with access to all data
use new fastfield creation api in initial creation and merge
remove unused collect of data in doc_id_mapping
2021-06-14 13:56:40 +02:00
35 changed files with 2098 additions and 450 deletions

View File

@@ -18,7 +18,13 @@ jobs:
- uses: actions/checkout@v2
- name: Build
run: cargo build --verbose --workspace
- name: Install latest nightly to test also against unstable feature flag
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
components: rustfmt
- name: Run tests
run: cargo test --verbose --workspace
run: cargo test --all-features --verbose --workspace
- name: Check Formatting
run: cargo fmt --all -- --check

View File

@@ -35,6 +35,8 @@ crossbeam = "0.8"
futures = { version = "0.3.15", features = ["thread-pool"] }
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version="0.1", path="./common" }
fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false }
stable_deref_trait = "1.2"
rust-stemmers = "1.2"
downcast-rs = "1.2"
@@ -88,7 +90,7 @@ unstable = [] # useful for benches.
wasm-bindgen = ["uuid/wasm-bindgen"]
[workspace]
members = ["query-grammar", "bitpacker"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs"]
[badges]
travis-ci = { repository = "tantivy-search/tantivy" }

View File

@@ -49,6 +49,7 @@ impl BitPacker {
let bytes = self.mini_buffer.to_le_bytes();
output.write_all(&bytes[..num_bytes])?;
self.mini_buffer_written = 0;
self.mini_buffer = 0;
}
Ok(())
}
@@ -61,7 +62,7 @@ impl BitPacker {
}
}
#[derive(Clone)]
#[derive(Clone, Debug, Default)]
pub struct BitUnpacker {
num_bits: u64,
mask: u64,

12
common/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "common"
version = "0.1.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2018"
description = "common traits and utility functions used by multiple tantivy subcrates"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
byteorder = "1.4.3"

9
common/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
pub use byteorder::LittleEndian as Endianness;
mod serialize;
mod vint;
mod writer;
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt};
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};

View File

@@ -1,5 +1,5 @@
use crate::common::Endianness;
use crate::common::VInt;
use crate::Endianness;
use crate::VInt;
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::fmt;
use std::io;
@@ -160,6 +160,28 @@ impl FixedSize for u8 {
const SIZE_IN_BYTES: usize = 1;
}
impl BinarySerializable for bool {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let val = if *self { 1 } else { 0 };
writer.write_u8(val)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
let val = reader.read_u8()?;
match val {
0 => Ok(false),
1 => Ok(true),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid bool value on deserialization, data corrupted",
)),
}
}
}
impl FixedSize for bool {
const SIZE_IN_BYTES: usize = 1;
}
impl BinarySerializable for String {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let data: &[u8] = self.as_bytes();
@@ -180,9 +202,9 @@ impl BinarySerializable for String {
#[cfg(test)]
pub mod test {
use super::VInt;
use super::*;
use crate::common::VInt;
use crate::serialize::BinarySerializable;
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
let mut buffer = Vec::new();
O::default().serialize(&mut buffer).unwrap();

View File

@@ -175,8 +175,8 @@ impl BinarySerializable for VInt {
mod tests {
use super::serialize_vint_u32;
use super::BinarySerializable;
use super::VInt;
use crate::common::BinarySerializable;
fn aux_test_vint(val: u64) {
let mut v = [14u8; 10];

View File

@@ -1,7 +1,4 @@
use crate::directory::AntiCallToken;
use crate::directory::TerminatingWrite;
use std::io;
use std::io::Write;
use std::io::{self, BufWriter, Write};
pub struct CountingWriter<W> {
underlying: W,
@@ -16,41 +13,87 @@ impl<W: Write> CountingWriter<W> {
}
}
#[inline]
pub fn written_bytes(&self) -> u64 {
self.written_bytes
}
/// Returns the underlying write object.
/// Note that this method does not trigger any flushing.
#[inline]
pub fn finish(self) -> W {
self.underlying
}
}
impl<W: Write> Write for CountingWriter<W> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let written_size = self.underlying.write(buf)?;
self.written_bytes += written_size as u64;
Ok(written_size)
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.underlying.write_all(buf)?;
self.written_bytes += buf.len() as u64;
Ok(())
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.underlying.flush()
}
}
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
#[inline]
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.underlying.terminate_ref(token)
}
}
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
///
/// The point is that while the type is public, it cannot be built by anyone
/// outside of this module.
pub struct AntiCallToken(());
/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where
Self: Sized,
{
self.terminate_ref(AntiCallToken(()))
}
/// You should implement this function to define custom behavior.
/// This function should flush any buffer it may hold.
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
}
impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.as_mut().terminate_ref(token)
}
}
impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
self.flush()?;
self.get_mut().terminate_ref(a)
}
}
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
self.flush()
}
}
#[cfg(test)]
mod test {

View File

@@ -0,0 +1,25 @@
[package]
name = "fastfield_codecs"
version = "0.1.0"
authors = ["Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2018"
description = "Fast field codecs used by tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common = { path = "../common/" }
tantivy-bitpacker = { path = "../bitpacker/" }
prettytable-rs = {version="0.8.0", optional= true}
#prettytable-rs = {version="0.8.0" }
rand = "0.8.3"
[dev-dependencies]
more-asserts = "0.2.1"
rand = "0.8.3"
[features]
bin = ["prettytable-rs"]
default = ["bin"]

View File

@@ -0,0 +1,68 @@
# Fast Field Codecs
This crate contains various fast field codecs, used to compress/decompress fast field data in tantivy.
## Contributing
Contributing is pretty straightforward. Since the bitpacking is the simplest compressor, you can check it for reference.
A codec needs to implement 2 traits:
- A reader implementing `FastFieldCodecReader` to read the codec.
- A serializer implementing `FastFieldCodecSerializer` for compression estimation and codec name + id.
### Tests
Once the traits are implemented test and benchmark integration is pretty easy (see `test_with_codec_data_sets` and `bench.rs`).
Make sure to add the codec to the main.rs, which tests the compression ratio and estimation against different data sets. You can run it with:
```
cargo run --features bin
```
### TODO
- Add real world data sets in comparison
- Add codec to cover sparse data sets
### Codec Comparison
```
+----------------------------------+-------------------+------------------------+
| | Compression Ratio | Compression Estimation |
+----------------------------------+-------------------+------------------------+
| Autoincrement | | |
+----------------------------------+-------------------+------------------------+
| LinearInterpol | 0.000039572664 | 0.000004396963 |
+----------------------------------+-------------------+------------------------+
| MultiLinearInterpol | 0.1477348 | 0.17275847 |
+----------------------------------+-------------------+------------------------+
| Bitpacked | 0.28126493 | 0.28125 |
+----------------------------------+-------------------+------------------------+
| Monotonically increasing concave | | |
+----------------------------------+-------------------+------------------------+
| LinearInterpol | 0.25003937 | 0.26562938 |
+----------------------------------+-------------------+------------------------+
| MultiLinearInterpol | 0.190665 | 0.1883836 |
+----------------------------------+-------------------+------------------------+
| Bitpacked | 0.31251436 | 0.3125 |
+----------------------------------+-------------------+------------------------+
| Monotonically increasing convex | | |
+----------------------------------+-------------------+------------------------+
| LinearInterpol | 0.25003937 | 0.28125438 |
+----------------------------------+-------------------+------------------------+
| MultiLinearInterpol | 0.18676 | 0.2040086 |
+----------------------------------+-------------------+------------------------+
| Bitpacked | 0.31251436 | 0.3125 |
+----------------------------------+-------------------+------------------------+
| Almost monotonically increasing | | |
+----------------------------------+-------------------+------------------------+
| LinearInterpol | 0.14066513 | 0.1562544 |
+----------------------------------+-------------------+------------------------+
| MultiLinearInterpol | 0.16335973 | 0.17275847 |
+----------------------------------+-------------------+------------------------+
| Bitpacked | 0.28126493 | 0.28125 |
+----------------------------------+-------------------+------------------------+
```

View File

@@ -0,0 +1,109 @@
#![feature(test)]
extern crate test;
#[cfg(test)]
mod tests {
use fastfield_codecs::{
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
},
*,
};
fn get_data() -> Vec<u64> {
let mut data: Vec<_> = (100..55000_u64)
.map(|num| num + rand::random::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);
data.insert(2000, 100);
data.insert(3000, 4100);
data.insert(4000, 100);
data.insert(5000, 800);
data
}
fn value_iter() -> impl Iterator<Item = u64> {
let data = (0..20_000).collect::<Vec<_>>();
data.into_iter()
}
fn bench_get<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
b: &mut Bencher,
data: &[u64],
) {
let mut bytes = vec![];
S::serialize(
&mut bytes,
&data,
stats_from_vec(&data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
let reader = R::open_from_bytes(&bytes).unwrap();
b.iter(|| {
for pos in value_iter() {
reader.get_u64(pos as u64, &bytes);
}
});
}
fn bench_create<S: FastFieldCodecSerializer>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
b.iter(|| {
S::serialize(
&mut bytes,
&data,
stats_from_vec(&data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
});
}
use test::Bencher;
#[bench]
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<BitpackedFastFieldSerializer>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<LinearInterpolFastFieldSerializer>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<MultiLinearInterpolFastFieldSerializer>(b, &data);
}
#[bench]
fn bench_fastfield_bitpack_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>(
b, &data,
);
}
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
let min_value = data.iter().cloned().min().unwrap_or(0);
let max_value = data.iter().cloned().max().unwrap_or(0);
FastFieldStats {
min_value,
max_value,
num_vals: data.len() as u64,
}
}
}

View File

@@ -0,0 +1,176 @@
use crate::FastFieldCodecReader;
use crate::FastFieldCodecSerializer;
use crate::FastFieldDataAccess;
use crate::FastFieldStats;
use common::BinarySerializable;
use std::io::{self, Write};
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
use tantivy_bitpacker::BitUnpacker;
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct BitpackedFastFieldReader {
bit_unpacker: BitUnpacker,
pub min_value_u64: u64,
pub max_value_u64: u64,
}
impl<'data> FastFieldCodecReader for BitpackedFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let (_data, mut footer) = bytes.split_at(bytes.len() - 16);
let min_value = u64::deserialize(&mut footer)?;
let amplitude = u64::deserialize(&mut footer)?;
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedFastFieldReader {
min_value_u64: min_value,
max_value_u64: max_value,
bit_unpacker,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
self.min_value_u64 + self.bit_unpacker.get(doc, &data)
}
#[inline]
fn min_value(&self) -> u64 {
self.min_value_u64
}
#[inline]
fn max_value(&self) -> u64 {
self.max_value_u64
}
}
pub struct BitpackedFastFieldSerializerLegacy<'a, W: 'a + Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
amplitude: u64,
num_bits: u8,
}
impl<'a, W: Write> BitpackedFastFieldSerializerLegacy<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializerLegacy<'a, W>> {
assert!(min_value <= max_value);
let amplitude = max_value - min_value;
let num_bits = compute_num_bits(amplitude);
let bit_packer = BitPacker::new();
Ok(BitpackedFastFieldSerializerLegacy {
bit_packer,
write,
min_value,
amplitude,
num_bits,
})
}
/// Pushes a new value to the currently open u64 fast field.
#[inline]
pub fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
Ok(())
}
pub fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)?;
self.min_value.serialize(&mut self.write)?;
self.amplitude.serialize(&mut self.write)?;
Ok(())
}
}
pub struct BitpackedFastFieldSerializer {}
impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
const NAME: &'static str = "Bitpacked";
const ID: u8 = 1;
/// Serializes data with the BitpackedFastFieldSerializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
fn serialize(
write: &mut impl Write,
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut serializer =
BitpackedFastFieldSerializerLegacy::open(write, stats.min_value, stats.max_value)?;
for val in data_iter {
serializer.add_val(val)?;
}
serializer.close_field()?;
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
_stats: FastFieldStats,
) -> bool {
true
}
fn estimate(_fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
num_bits as f32 / num_bits_uncompressed as f32
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
crate::tests::create_and_validate::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(
&data, name,
);
}
#[test]
fn test_with_codec_data_sets() {
let data_sets = get_codec_test_data_sets();
for (mut data, name) in data_sets {
create_and_validate(&data, name);
data.reverse();
create_and_validate(&data, name);
}
}
#[test]
fn bitpacked_fast_field_rand() {
for _ in 0..500 {
let mut data = (0..1 + rand::random::<u8>() as usize)
.map(|_| rand::random::<i64>() as u64 / 2 as u64)
.collect::<Vec<_>>();
create_and_validate(&data, "rand");
data.reverse();
create_and_validate(&data, "rand");
}
}
}

231
fastfield_codecs/src/lib.rs Normal file
View File

@@ -0,0 +1,231 @@
#[cfg(test)]
#[macro_use]
extern crate more_asserts;
use std::io;
use std::io::Write;
pub mod bitpacked;
pub mod linearinterpol;
pub mod multilinearinterpol;
pub trait FastFieldCodecReader: Sized {
/// reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
fn get_u64(&self, doc: u64, data: &[u8]) -> u64;
fn min_value(&self) -> u64;
fn max_value(&self) -> u64;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldCodecSerializer {
/// A codex needs to provide a unique name and id, which is
/// used for debugging and de/serialization.
const NAME: &'static str;
const ID: u8;
/// Check if the Codec is able to compress the data
fn is_applicable(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> bool;
/// Returns an estimate of the compression ratio.
/// The baseline is uncompressed 64bit data.
///
/// It could make sense to also return a value representing
/// computational complexity.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32;
/// Serializes the data using the serializer into write.
/// There are multiple iterators, in case the codec needs to read the data multiple times.
/// The iterators should be preferred over using fastfield_accessor for performance reasons.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()>;
}
/// FastFieldDataAccess is the trait to access fast field data during serialization and estimation.
pub trait FastFieldDataAccess {
/// Return the value associated to the given document.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
fn get(&self, doc: u32) -> u64;
}
#[derive(Debug, Clone)]
pub struct FastFieldStats {
pub min_value: u64,
pub max_value: u64,
pub num_vals: u64,
}
impl<'a> FastFieldDataAccess for &'a [u64] {
fn get(&self, doc: u32) -> u64 {
self[doc as usize]
}
}
impl<'a> FastFieldDataAccess for &'a Vec<u64> {
fn get(&self, doc: u32) -> u64 {
self[doc as usize]
}
}
impl FastFieldDataAccess for Vec<u64> {
fn get(&self, doc: u32) -> u64 {
self[doc as usize]
}
}
#[cfg(test)]
mod tests {
use crate::{
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
},
};
pub fn create_and_validate<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
data: &[u64],
name: &str,
) -> (f32, f32) {
if !S::is_applicable(&data, crate::tests::stats_from_vec(&data)) {
return (f32::MAX, 0.0);
}
let estimation = S::estimate(&data, crate::tests::stats_from_vec(&data));
let mut out = vec![];
S::serialize(
&mut out,
&data,
crate::tests::stats_from_vec(&data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
let reader = R::open_from_bytes(&out).unwrap();
for (doc, orig_val) in data.iter().enumerate() {
let val = reader.get_u64(doc as u64, &out);
if val != *orig_val {
panic!(
"val {:?} does not match orig_val {:?}, in data set {}, data {:?}",
val, orig_val, name, data
);
}
}
let actual_compression = data.len() as f32 / out.len() as f32;
return (estimation, actual_compression);
}
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
let mut data_and_names = vec![];
let data = (10..=20_u64).collect::<Vec<_>>();
data_and_names.push((data, "simple monotonically increasing"));
data_and_names.push((
vec![5, 6, 7, 8, 9, 10, 99, 100],
"offset in linear interpol",
));
data_and_names.push((vec![5, 50, 3, 13, 1, 1000, 35], "rand small"));
data_and_names.push((vec![10], "single value"));
data_and_names
}
fn test_codec<S: FastFieldCodecSerializer, R: FastFieldCodecReader>() {
let codec_name = S::NAME;
for (data, data_set_name) in get_codec_test_data_sets() {
let (estimate, actual) =
crate::tests::create_and_validate::<S, R>(&data, data_set_name);
let result = if estimate == f32::MAX {
"Disabled".to_string()
} else {
format!("Estimate {:?} Actual {:?} ", estimate, actual)
};
println!(
"Codec {}, DataSet {}, {}",
codec_name, data_set_name, result
);
}
}
#[test]
fn test_codec_bitpacking() {
test_codec::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>();
}
#[test]
fn test_codec_interpolation() {
test_codec::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>();
}
#[test]
fn test_codec_multi_interpolation() {
test_codec::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>();
}
use super::*;
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
let min_value = data.iter().cloned().min().unwrap_or(0);
let max_value = data.iter().cloned().max().unwrap_or(0);
FastFieldStats {
min_value,
max_value,
num_vals: data.len() as u64,
}
}
#[test]
fn estimation_good_interpolation_case() {
let data = (10..=20000_u64).collect::<Vec<_>>();
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.01);
let multi_linear_interpol_estimation =
MultiLinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(multi_linear_interpol_estimation, 0.2);
assert_le!(linear_interpol_estimation, multi_linear_interpol_estimation);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, bitpacked_estimation);
}
#[test]
fn estimation_test_bad_interpolation_case() {
let data = vec![200, 10, 10, 10, 10, 1000, 20];
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.32);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(bitpacked_estimation, linear_interpol_estimation);
}
#[test]
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
let mut data = (200..=20000_u64).collect::<Vec<_>>();
data.push(1_000_000);
// in this case the linear interpolation can't in fact not be worse than bitpacking,
// but the estimator adds some threshold, which leads to estimated worse behavior
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.35);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
assert_le!(bitpacked_estimation, 0.32);
assert_le!(bitpacked_estimation, linear_interpol_estimation);
}
}

View File

@@ -0,0 +1,297 @@
use crate::FastFieldCodecReader;
use crate::FastFieldCodecSerializer;
use crate::FastFieldDataAccess;
use crate::FastFieldStats;
use std::io::{self, Read, Write};
use std::ops::Sub;
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
use common::BinarySerializable;
use common::FixedSize;
use tantivy_bitpacker::BitUnpacker;
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct LinearInterpolFastFieldReader {
bit_unpacker: BitUnpacker,
pub footer: LinearInterpolFooter,
pub slope: f32,
}
#[derive(Clone, Debug)]
pub struct LinearInterpolFooter {
pub relative_max_value: u64,
pub offset: u64,
pub first_val: u64,
pub last_val: u64,
pub num_vals: u64,
pub min_value: u64,
pub max_value: u64,
}
impl BinarySerializable for LinearInterpolFooter {
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
self.relative_max_value.serialize(write)?;
self.offset.serialize(write)?;
self.first_val.serialize(write)?;
self.last_val.serialize(write)?;
self.num_vals.serialize(write)?;
self.min_value.serialize(write)?;
self.max_value.serialize(write)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<LinearInterpolFooter> {
Ok(LinearInterpolFooter {
relative_max_value: u64::deserialize(reader)?,
offset: u64::deserialize(reader)?,
first_val: u64::deserialize(reader)?,
last_val: u64::deserialize(reader)?,
num_vals: u64::deserialize(reader)?,
min_value: u64::deserialize(reader)?,
max_value: u64::deserialize(reader)?,
})
}
}
impl FixedSize for LinearInterpolFooter {
const SIZE_IN_BYTES: usize = 56;
}
impl FastFieldCodecReader for LinearInterpolFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let (_data, mut footer) = bytes.split_at(bytes.len() - LinearInterpolFooter::SIZE_IN_BYTES);
let footer = LinearInterpolFooter::deserialize(&mut footer)?;
let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals);
let num_bits = compute_num_bits(footer.relative_max_value);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(LinearInterpolFastFieldReader {
bit_unpacker,
footer,
slope,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope);
(calculated_value + self.bit_unpacker.get(doc, &data)) - self.footer.offset
}
#[inline]
fn min_value(&self) -> u64 {
self.footer.min_value
}
#[inline]
fn max_value(&self) -> u64 {
self.footer.max_value
}
}
/// Fastfield serializer, which tries to guess values by linear interpolation
/// and stores the difference bitpacked.
pub struct LinearInterpolFastFieldSerializer {}
#[inline]
fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
if num_vals <= 1 {
return 0.0;
}
// We calculate the slope with f64 high precision and use the result in lower precision f32
// This is done in order to handle estimations for very large values like i64::MAX
((last_val as f64 - first_val as f64) / (num_vals as u64 - 1) as f64) as f32
}
#[inline]
fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
first_val + (pos as f32 * slope) as u64
}
impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
const NAME: &'static str = "LinearInterpol";
const ID: u8 = 2;
/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
let first_val = fastfield_accessor.get(0);
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
let slope = get_slope(first_val, last_val, stats.num_vals);
// calculate offset to ensure all values are positive
let mut offset = 0;
let mut rel_positive_max = 0;
for (pos, actual_value) in data_iter1.enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
if calculated_value > actual_value {
// negative value we need to apply an offset
// we ignore negative values in the max value calculation, because negative values
// will be offset to 0
offset = offset.max(calculated_value - actual_value);
} else {
//positive value no offset reuqired
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
}
}
// rel_positive_max will be adjusted by offset
let relative_max_value = rel_positive_max + offset;
let num_bits = compute_num_bits(relative_max_value);
let mut bit_packer = BitPacker::new();
for (pos, val) in data_iter.enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
let diff = (val + offset) - calculated_value;
bit_packer.write(diff, num_bits, write)?;
}
bit_packer.close(write)?;
let footer = LinearInterpolFooter {
relative_max_value,
offset,
first_val,
last_val,
num_vals: stats.num_vals,
min_value: stats.min_value,
max_value: stats.max_value,
};
footer.serialize(write)?;
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> bool {
if stats.num_vals < 3 {
return false; //disable compressor for this case
}
// On serialisation the offset is added to the actual value.
// We need to make sure this won't run into overflow calculation issues.
// For this we take the maximum theroretical offset and add this to the max value.
// If this doesn't overflow the algortihm should be fine
let theorethical_maximum_offset = stats.max_value - stats.min_value;
if stats
.max_value
.checked_add(theorethical_maximum_offset)
.is_none()
{
return false;
}
true
}
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima for the deviation of the calculated value are and
/// the offset to shift all values to >=0 is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
let first_val = fastfield_accessor.get(0);
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
let slope = get_slope(first_val, last_val, stats.num_vals);
// let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = stats.num_vals as f32 / 100.0;
let sample_positions = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as usize)
.collect::<Vec<_>>();
let max_distance = sample_positions
.iter()
.map(|pos| {
let calculated_value = get_calculated_value(first_val, *pos as u64, slope);
let actual_value = fastfield_accessor.get(*pos as u32);
distance(calculated_value, actual_value)
})
.max()
.unwrap_or(0);
// the theory would be that we don't have the actual max_distance, but we are close within 50%
// threshold.
// It is multiplied by 2 because in a log case scenario the line would be as much above as
// below. So the offset would = max_distance
//
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * stats.num_vals as u64
+ LinearInterpolFooter::SIZE_IN_BYTES as u64;
let num_bits_uncompressed = 64 * stats.num_vals;
num_bits as f32 / num_bits_uncompressed as f32
}
}
#[inline]
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
if x < y {
y - x
} else {
x - y
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
crate::tests::create_and_validate::<
LinearInterpolFastFieldSerializer,
LinearInterpolFastFieldReader,
>(&data, name);
}
#[test]
fn test_with_codec_data_sets() {
let data_sets = get_codec_test_data_sets();
for (mut data, name) in data_sets {
create_and_validate(&data, name);
data.reverse();
create_and_validate(&data, name);
}
}
#[test]
fn linear_interpol_fast_field_test_large_amplitude() {
let data = vec![
i64::MAX as u64 / 2,
i64::MAX as u64 / 3,
i64::MAX as u64 / 2,
];
create_and_validate(&data, "large amplitude");
}
#[test]
fn linear_interpol_fast_concave_data() {
let data = vec![0, 1, 2, 5, 8, 10, 20, 50];
create_and_validate(&data, "concave data");
}
#[test]
fn linear_interpol_fast_convex_data() {
let data = vec![0, 40, 60, 70, 75, 77];
create_and_validate(&data, "convex data");
}
#[test]
fn linear_interpol_fast_field_test_simple() {
let data = (10..=20_u64).collect::<Vec<_>>();
create_and_validate(&data, "simple monotonically");
}
#[test]
fn linear_interpol_fast_field_rand() {
for _ in 0..5000 {
let mut data = (0..50 as usize)
.map(|_| rand::random::<u64>())
.collect::<Vec<_>>();
create_and_validate(&data, "random");
data.reverse();
create_and_validate(&data, "random");
}
}
}

View File

@@ -0,0 +1,126 @@
#[macro_use]
extern crate prettytable;
use fastfield_codecs::{
linearinterpol::LinearInterpolFastFieldSerializer,
multilinearinterpol::MultiLinearInterpolFastFieldSerializer, FastFieldCodecSerializer,
FastFieldStats,
};
use prettytable::{Cell, Row, Table};
fn main() {
let mut table = Table::new();
// Add a row per time
table.add_row(row!["", "Compression Ratio", "Compression Estimation"]);
for (data, data_set_name) in get_codec_test_data_sets() {
let mut results = vec![];
let res = serialize_with_codec::<LinearInterpolFastFieldSerializer>(&data);
results.push(res);
let res = serialize_with_codec::<MultiLinearInterpolFastFieldSerializer>(&data);
results.push(res);
let res = serialize_with_codec::<fastfield_codecs::bitpacked::BitpackedFastFieldSerializer>(
&data,
);
results.push(res);
//let best_estimation_codec = results
//.iter()
//.min_by(|res1, res2| res1.partial_cmp(&res2).unwrap())
//.unwrap();
let best_compression_ratio_codec = results
.iter()
.min_by(|res1, res2| res1.partial_cmp(&res2).unwrap())
.cloned()
.unwrap();
table.add_row(Row::new(vec![Cell::new(data_set_name).style_spec("Bbb")]));
for (is_applicable, est, comp, name) in results {
let (est_cell, ratio_cell) = if !is_applicable {
("Codec Disabled".to_string(), "".to_string())
} else {
(est.to_string(), comp.to_string())
};
let style = if comp == best_compression_ratio_codec.1 {
"Fb"
} else {
""
};
table.add_row(Row::new(vec![
Cell::new(&name.to_string()).style_spec("bFg"),
Cell::new(&ratio_cell).style_spec(style),
Cell::new(&est_cell).style_spec(""),
]));
}
}
table.printstd();
}
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
let mut data_and_names = vec![];
let data = (1000..=200_000_u64).collect::<Vec<_>>();
data_and_names.push((data, "Autoincrement"));
let mut current_cumulative = 0;
let data = (1..=200_000_u64)
.map(|num| {
let num = (num as f32 + num as f32).log10() as u64;
current_cumulative += num;
current_cumulative
})
.collect::<Vec<_>>();
//let data = (1..=200000_u64).map(|num| num + num).collect::<Vec<_>>();
data_and_names.push((data, "Monotonically increasing concave"));
let mut current_cumulative = 0;
let data = (1..=200_000_u64)
.map(|num| {
let num = (200_000.0 - num as f32).log10() as u64;
current_cumulative += num;
current_cumulative
})
.collect::<Vec<_>>();
data_and_names.push((data, "Monotonically increasing convex"));
let data = (1000..=200_000_u64)
.map(|num| num + rand::random::<u8>() as u64)
.collect::<Vec<_>>();
data_and_names.push((data, "Almost monotonically increasing"));
data_and_names
}
pub fn serialize_with_codec<S: FastFieldCodecSerializer>(
data: &[u64],
) -> (bool, f32, f32, &'static str) {
let is_applicable = S::is_applicable(&data, stats_from_vec(&data));
if !is_applicable {
return (false, 0.0, 0.0, S::NAME);
}
let estimation = S::estimate(&data, stats_from_vec(&data));
let mut out = vec![];
S::serialize(
&mut out,
&data,
stats_from_vec(&data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
let actual_compression = out.len() as f32 / (data.len() * 8) as f32;
return (true, estimation, actual_compression, S::NAME);
}
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
let min_value = data.iter().cloned().min().unwrap_or(0);
let max_value = data.iter().cloned().max().unwrap_or(0);
FastFieldStats {
min_value,
max_value,
num_vals: data.len() as u64,
}
}

View File

@@ -0,0 +1,410 @@
use crate::FastFieldCodecReader;
use crate::FastFieldCodecSerializer;
use crate::FastFieldDataAccess;
use crate::FastFieldStats;
use common::CountingWriter;
use std::io::{self, Read, Write};
use std::ops::Sub;
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
use common::BinarySerializable;
use common::DeserializeFrom;
use tantivy_bitpacker::BitUnpacker;
const CHUNK_SIZE: u64 = 512;
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct MultiLinearInterpolFastFieldReader {
pub footer: MultiLinearInterpolFooter,
}
#[derive(Clone, Debug, Default)]
struct Function {
// The offset in the data is required, because we have diffrent bit_widths per block
data_start_offset: u64,
// start_pos in the block will be CHUNK_SIZE * BLOCK_NUM
start_pos: u64,
// only used during serialization, 0 after deserialization
end_pos: u64,
// only used during serialization, 0 after deserialization
value_start_pos: u64,
// only used during serialization, 0 after deserialization
value_end_pos: u64,
slope: f32,
// The offset so that all values are positive when writing them
positive_val_offset: u64,
num_bits: u8,
bit_unpacker: BitUnpacker,
}
impl Function {
fn calc_slope(&mut self) {
let num_vals = self.end_pos - self.start_pos;
get_slope(self.value_start_pos, self.value_end_pos, num_vals);
}
// split the interpolation into two function, change self and return the second split
fn split(&mut self, split_pos: u64, split_pos_value: u64) -> Function {
let mut new_function = Function {
start_pos: split_pos,
end_pos: self.end_pos,
value_start_pos: split_pos_value,
value_end_pos: self.value_end_pos,
..Default::default()
};
new_function.calc_slope();
self.end_pos = split_pos;
self.value_end_pos = split_pos_value;
self.calc_slope();
new_function
}
}
impl BinarySerializable for Function {
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
self.data_start_offset.serialize(write)?;
self.value_start_pos.serialize(write)?;
self.positive_val_offset.serialize(write)?;
self.slope.serialize(write)?;
self.num_bits.serialize(write)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Function> {
let data_start_offset = u64::deserialize(reader)?;
let value_start_pos = u64::deserialize(reader)?;
let offset = u64::deserialize(reader)?;
let slope = f32::deserialize(reader)?;
let num_bits = u8::deserialize(reader)?;
let interpolation = Function {
data_start_offset,
value_start_pos,
positive_val_offset: offset,
num_bits,
bit_unpacker: BitUnpacker::new(num_bits),
slope,
..Default::default()
};
Ok(interpolation)
}
}
#[derive(Clone, Debug)]
pub struct MultiLinearInterpolFooter {
pub num_vals: u64,
pub min_value: u64,
pub max_value: u64,
interpolations: Vec<Function>,
}
impl BinarySerializable for MultiLinearInterpolFooter {
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
let mut out = vec![];
self.num_vals.serialize(&mut out)?;
self.min_value.serialize(&mut out)?;
self.max_value.serialize(&mut out)?;
self.interpolations.serialize(&mut out)?;
write.write_all(&out)?;
(out.len() as u32).serialize(write)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<MultiLinearInterpolFooter> {
let mut footer = MultiLinearInterpolFooter {
num_vals: u64::deserialize(reader)?,
min_value: u64::deserialize(reader)?,
max_value: u64::deserialize(reader)?,
interpolations: Vec::<Function>::deserialize(reader)?,
};
for (num, interpol) in footer.interpolations.iter_mut().enumerate() {
interpol.start_pos = CHUNK_SIZE * num as u64;
}
Ok(footer)
}
}
#[inline]
fn get_interpolation_position(doc: u64) -> usize {
let index = doc / CHUNK_SIZE;
index as usize
}
#[inline]
fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Function {
&interpolations[get_interpolation_position(doc)]
}
impl FastFieldCodecReader for MultiLinearInterpolFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let (_data, mut footer) = bytes.split_at(bytes.len() - (4 + footer_len) as usize);
let footer = MultiLinearInterpolFooter::deserialize(&mut footer)?;
Ok(MultiLinearInterpolFastFieldReader { footer })
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
let interpolation = get_interpolation_function(doc, &self.footer.interpolations);
let doc = doc - interpolation.start_pos;
let calculated_value =
get_calculated_value(interpolation.value_start_pos, doc, interpolation.slope);
let diff = interpolation
.bit_unpacker
.get(doc, &data[interpolation.data_start_offset as usize..]);
(calculated_value + diff) - interpolation.positive_val_offset
}
#[inline]
fn min_value(&self) -> u64 {
self.footer.min_value
}
#[inline]
fn max_value(&self) -> u64 {
self.footer.max_value
}
}
#[inline]
fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
((last_val as f64 - first_val as f64) / (num_vals as u64 - 1) as f64) as f32
}
#[inline]
fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
(first_val as i64 + (pos as f32 * slope) as i64) as u64
}
/// Same as LinearInterpolFastFieldSerializer, but working on chunks of CHUNK_SIZE elements.
pub struct MultiLinearInterpolFastFieldSerializer {}
impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
const NAME: &'static str = "MultiLinearInterpol";
const ID: u8 = 3;
/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
let first_val = fastfield_accessor.get(0);
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
let mut first_function = Function {
end_pos: stats.num_vals,
value_start_pos: first_val,
value_end_pos: last_val,
..Default::default()
};
first_function.calc_slope();
let mut interpolations = vec![first_function];
// Since we potentially apply multiple passes over the data, the data is cached.
// Multiple iteration can be expensive (merge with index sorting can add lot of overhead per
// iteration)
let data = data_iter.collect::<Vec<_>>();
//// let's split this into chunks of CHUNK_SIZE
for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) {
let new_fun = {
let current_interpolation = interpolations.last_mut().unwrap();
current_interpolation.split(data_pos, data[data_pos as usize])
};
interpolations.push(new_fun);
}
// calculate offset and max (-> numbits) for each function
for interpolation in &mut interpolations {
let mut offset = 0;
let mut rel_positive_max = 0;
for (pos, actual_value) in data
[interpolation.start_pos as usize..interpolation.end_pos as usize]
.iter()
.cloned()
.enumerate()
{
let calculated_value = get_calculated_value(
interpolation.value_start_pos,
pos as u64,
interpolation.slope,
);
if calculated_value > actual_value {
// negative value we need to apply an offset
// we ignore negative values in the max value calculation, because negative values
// will be offset to 0
offset = offset.max(calculated_value - actual_value);
} else {
//positive value no offset reuqired
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
}
}
interpolation.positive_val_offset = offset;
interpolation.num_bits = compute_num_bits(rel_positive_max + offset);
}
let mut bit_packer = BitPacker::new();
let write = &mut CountingWriter::wrap(write);
for interpolation in &mut interpolations {
interpolation.data_start_offset = write.written_bytes();
let num_bits = interpolation.num_bits;
for (pos, actual_value) in data
[interpolation.start_pos as usize..interpolation.end_pos as usize]
.iter()
.cloned()
.enumerate()
{
let calculated_value = get_calculated_value(
interpolation.value_start_pos,
pos as u64,
interpolation.slope,
);
let diff = (actual_value + interpolation.positive_val_offset) - calculated_value;
bit_packer.write(diff, num_bits, write)?;
}
bit_packer.flush(write)?;
}
bit_packer.close(write)?;
let footer = MultiLinearInterpolFooter {
num_vals: stats.num_vals,
min_value: stats.min_value,
max_value: stats.max_value,
interpolations,
};
footer.serialize(write)?;
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> bool {
if stats.num_vals < 5_000 {
return false;
}
// On serialization the offset is added to the actual value.
// We need to make sure this won't run into overflow calculation issues.
// For this we take the maximum theroretical offset and add this to the max value.
// If this doesn't overflow the algortihm should be fine
let theorethical_maximum_offset = stats.max_value - stats.min_value;
if stats
.max_value
.checked_add(theorethical_maximum_offset)
.is_none()
{
return false;
}
true
}
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima are for the deviation of the calculated value and
/// the offset is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
let first_val_in_first_block = fastfield_accessor.get(0);
let last_elem_in_first_chunk = CHUNK_SIZE.min(stats.num_vals);
let last_val_in_first_block = fastfield_accessor.get(last_elem_in_first_chunk as u32 - 1);
let slope = get_slope(
first_val_in_first_block,
last_val_in_first_block,
stats.num_vals,
);
// let's sample at 0%, 5%, 10% .. 95%, 100%, but for the first block only
let sample_positions = (0..20)
.map(|pos| (last_elem_in_first_chunk as f32 / 100.0 * pos as f32 * 5.0) as usize)
.collect::<Vec<_>>();
let max_distance = sample_positions
.iter()
.map(|pos| {
let calculated_value =
get_calculated_value(first_val_in_first_block, *pos as u64, slope);
let actual_value = fastfield_accessor.get(*pos as u32);
distance(calculated_value, actual_value)
})
.max()
.unwrap();
// Estimate one block and extrapolate the cost to all blocks.
// the theory would be that we don't have the actual max_distance, but we are close within 50%
// threshold.
// It is multiplied by 2 because in a log case scenario the line would be as much above as
// below. So the offset would = max_distance
//
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * stats.num_vals as u64
// function metadata per block
+ 29 * (stats.num_vals / CHUNK_SIZE);
let num_bits_uncompressed = 64 * stats.num_vals;
num_bits as f32 / num_bits_uncompressed as f32
}
}
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
if x < y {
y - x
} else {
x - y
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
crate::tests::create_and_validate::<
MultiLinearInterpolFastFieldSerializer,
MultiLinearInterpolFastFieldReader,
>(&data, name);
}
#[test]
fn test_with_codec_data_sets() {
let data_sets = get_codec_test_data_sets();
for (mut data, name) in data_sets {
create_and_validate(&data, name);
data.reverse();
create_and_validate(&data, name);
}
}
#[test]
fn test_simple() {
let data = (10..=20_u64).collect::<Vec<_>>();
create_and_validate(&data, "simple monotonically");
}
#[test]
fn border_cases_1() {
let data = (0..1024).collect::<Vec<_>>();
create_and_validate(&data, "border case");
}
#[test]
fn border_case_2() {
let data = (0..1025).collect::<Vec<_>>();
create_and_validate(&data, "border case");
}
#[test]
fn rand() {
for _ in 0..10 {
let mut data = (5_000..20_000)
.map(|_| rand::random::<u64>() as u64)
.collect::<Vec<_>>();
create_and_validate(&data, "random");
data.reverse();
create_and_validate(&data, "random");
}
}
}

View File

@@ -1,18 +1,15 @@
mod bitset;
mod composite_file;
mod counting_writer;
mod serialize;
mod vint;
pub use self::bitset::BitSet;
pub(crate) use self::bitset::TinySet;
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::counting_writer::CountingWriter;
pub use self::serialize::{BinarySerializable, DeserializeFrom, FixedSize};
pub use self::vint::{
pub use byteorder::LittleEndian as Endianness;
pub use common::CountingWriter;
pub use common::{
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt,
};
pub use byteorder::LittleEndian as Endianness;
pub use common::{BinarySerializable, DeserializeFrom, FixedSize};
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
///
@@ -103,8 +100,8 @@ pub fn u64_to_f64(val: u64) -> f64 {
#[cfg(test)]
pub(crate) mod test {
pub use super::serialize::test::fixed_size_test;
use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use common::{BinarySerializable, FixedSize};
use proptest::prelude::*;
use std::f64;
use tantivy_bitpacker::compute_num_bits;
@@ -118,6 +115,12 @@ pub(crate) mod test {
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
}
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
let mut buffer = Vec::new();
O::default().serialize(&mut buffer).unwrap();
assert_eq!(buffer.len(), O::SIZE_IN_BYTES);
}
proptest! {
#[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {

View File

@@ -28,7 +28,9 @@ pub use self::file_slice::{FileHandle, FileSlice};
pub use self::owned_bytes::OwnedBytes;
pub use self::ram_directory::RamDirectory;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
use std::io::{self, BufWriter, Write};
pub use common::AntiCallToken;
pub use common::TerminatingWrite;
use std::io::BufWriter;
use std::path::PathBuf;
/// Outcome of the Garbage collection
@@ -50,47 +52,6 @@ pub use self::mmap_directory::MmapDirectory;
pub use self::managed_directory::ManagedDirectory;
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
///
/// The point is that while the type is public, it cannot be built by anyone
/// outside of this module.
pub struct AntiCallToken(());
/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where
Self: Sized,
{
self.terminate_ref(AntiCallToken(()))
}
/// You should implement this function to define custom behavior.
/// This function should flush any buffer it may hold.
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
}
impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.as_mut().terminate_ref(token)
}
}
impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
self.flush()?;
self.get_mut().terminate_ref(a)
}
}
#[cfg(test)]
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
self.flush()
}
}
/// Write object for Directory.
///
/// `WritePtr` are required to implement both Write

View File

@@ -46,7 +46,7 @@ impl Drop for VecWriter {
fn drop(&mut self) {
if !self.is_flushed {
panic!(
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop. This also occurs when the indexer crashed, so you may want to check the logs for the root cause.",
self.path
)
}

View File

@@ -1,6 +1,5 @@
use std::io;
use crate::fastfield::serializer::FastFieldSerializer;
use crate::schema::{Document, Field, Value};
use crate::DocId;
use crate::{

View File

@@ -29,12 +29,13 @@ pub use self::delete::DeleteBitSet;
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::BitpackedFastFieldReader;
pub(crate) use self::reader::BitpackedFastFieldReader;
pub use self::reader::DynamicFastFieldReader;
pub use self::reader::FastFieldReader;
pub use self::readers::FastFieldReaders;
pub use self::serializer::CompositeFastFieldSerializer;
pub use self::serializer::FastFieldSerializer;
pub use self::serializer::FastFieldDataAccess;
pub use self::serializer::FastFieldStats;
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::schema::Cardinality;
use crate::schema::FieldType;
@@ -213,15 +214,14 @@ mod tests {
use super::*;
use crate::common::CompositeFile;
use crate::common::HasLen;
use crate::directory::{Directory, RamDirectory, WritePtr};
use crate::fastfield::BitpackedFastFieldReader;
use crate::merge_policy::NoMergePolicy;
use crate::schema::Field;
use crate::schema::Schema;
use crate::schema::FAST;
use crate::schema::{Document, IntOptions};
use crate::{Index, SegmentId, SegmentReader};
use common::HasLen;
use once_cell::sync::Lazy;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
@@ -239,7 +239,7 @@ mod tests {
#[test]
pub fn test_fastfield() {
let test_fastfield = BitpackedFastFieldReader::<u64>::from(vec![100, 200, 300]);
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
assert_eq!(test_fastfield.get(0), 100);
assert_eq!(test_fastfield.get(1), 200);
assert_eq!(test_fastfield.get(2), 300);
@@ -268,10 +268,10 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 36 as usize);
assert_eq!(file.len(), 37 as usize);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(file)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 13u64);
assert_eq!(fast_field_reader.get(1), 14u64);
assert_eq!(fast_field_reader.get(2), 2u64);
@@ -299,11 +299,11 @@ mod tests {
serializer.close()?;
}
let file = directory.open_read(&path)?;
assert_eq!(file.len(), 61 as usize);
assert_eq!(file.len(), 62 as usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 4u64);
assert_eq!(fast_field_reader.get(1), 14_082_001u64);
assert_eq!(fast_field_reader.get(2), 3_052u64);
@@ -335,11 +335,11 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 34 as usize);
assert_eq!(file.len(), 35 as usize);
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
for doc in 0..10_000 {
assert_eq!(fast_field_reader.get(doc), 100_000u64);
}
@@ -367,11 +367,11 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 80042 as usize);
assert_eq!(file.len(), 80043 as usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 0u64);
for doc in 1..10_001 {
assert_eq!(
@@ -406,11 +406,12 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 17709 as usize);
//assert_eq!(file.len(), 17710 as usize); //bitpacked size
assert_eq!(file.len(), 10175 as usize); // linear interpol size
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
assert_eq!(fast_field_reader.min_value(), -100i64);
assert_eq!(fast_field_reader.max_value(), 9_999i64);
@@ -450,7 +451,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
assert_eq!(fast_field_reader.get(0u32), 0i64);
}
Ok(())
@@ -483,7 +484,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let mut a = 0u64;
for _ in 0..n {
@@ -627,7 +628,7 @@ mod bench {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer = FastFieldSerializer::from_write(write).unwrap();
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
for &x in &permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x));
@@ -641,7 +642,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
b.iter(|| {
let n = test::black_box(7000u32);
@@ -661,7 +662,7 @@ mod bench {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer = FastFieldSerializer::from_write(write).unwrap();
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
for &x in &permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x));
@@ -675,7 +676,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
b.iter(|| {
let n = test::black_box(1000u32);

View File

@@ -1,6 +1,8 @@
use std::ops::Range;
use crate::fastfield::{BitpackedFastFieldReader, FastFieldReader, FastValue, MultiValueLength};
use crate::fastfield::{
BitpackedFastFieldReader, DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength,
};
use crate::DocId;
/// Reader for a multivalued `u64` fast field.
@@ -13,13 +15,13 @@ use crate::DocId;
///
#[derive(Clone)]
pub struct MultiValuedFastFieldReader<Item: FastValue> {
idx_reader: BitpackedFastFieldReader<u64>,
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: BitpackedFastFieldReader<Item>,
}
impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
pub(crate) fn open(
idx_reader: BitpackedFastFieldReader<u64>,
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: BitpackedFastFieldReader<Item>,
) -> MultiValuedFastFieldReader<Item> {
MultiValuedFastFieldReader {
@@ -44,6 +46,24 @@ impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
self.vals_reader.get_range_u64(range.start, &mut vals[..]);
}
/// Returns the minimum value for this fast field.
///
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual mimimum value.
pub fn min_value(&self) -> Item {
self.vals_reader.min_value()
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
pub fn max_value(&self) -> Item {
self.vals_reader.max_value()
}
/// Returns the number of values associated with the document `DocId`.
pub fn num_vals(&self, doc: DocId) -> usize {
let range = self.range(doc);
@@ -69,7 +89,7 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
mod tests {
use crate::core::Index;
use crate::schema::{Facet, Schema, INDEXED};
use crate::schema::{Cardinality, Facet, IntOptions, Schema, INDEXED};
#[test]
fn test_multifastfield_reader() {
@@ -124,4 +144,32 @@ mod tests {
assert_eq!(&vals[..], &[4]);
}
}
#[test]
fn test_multifastfield_reader_min_max() {
let mut schema_builder = Schema::builder();
let field_options = IntOptions::default()
.set_indexed()
.set_fast(Cardinality::MultiValues);
let item_field = schema_builder.add_i64_field("items", field_options);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index
.writer_for_tests()
.expect("Failed to create index writer.");
index_writer.add_document(doc!(
item_field => 2i64,
item_field => 3i64,
item_field => -2i64,
));
index_writer.add_document(doc!(item_field => 6i64, item_field => 3i64));
index_writer.add_document(doc!(item_field => 4i64));
index_writer.commit().expect("Commit failed");
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let field_reader = segment_reader.fast_fields().i64s(item_field).unwrap();
assert_eq!(field_reader.min_value(), -2);
assert_eq!(field_reader.max_value(), 6);
}
}

View File

@@ -1,5 +1,4 @@
use crate::fastfield::serializer::DynamicFastFieldSerializer;
use crate::fastfield::serializer::FastFieldSerializer;
use crate::fastfield::serializer::BitpackedFastFieldSerializerLegacy;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::postings::UnorderedTermId;
use crate::schema::{Document, Field};
@@ -155,7 +154,7 @@ impl MultiValuedFastFieldWriter {
}
{
// writing the values themselves.
let mut value_serializer: DynamicFastFieldSerializer<'_, _>;
let mut value_serializer: BitpackedFastFieldSerializerLegacy<'_, _>;
match mapping_opt {
Some(mapping) => {
value_serializer = serializer.new_u64_fast_field_with_idx(

View File

@@ -8,11 +8,17 @@ use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
use crate::schema::Schema;
use crate::schema::FAST;
use crate::DocId;
use fastfield_codecs::bitpacked::BitpackedFastFieldReader as BitpackedReader;
use fastfield_codecs::bitpacked::BitpackedFastFieldSerializer;
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldReader;
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldReader;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
use fastfield_codecs::FastFieldCodecReader;
use fastfield_codecs::FastFieldCodecSerializer;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::Path;
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitUnpacker;
/// FastFieldReader is the trait to access fast field data.
pub trait FastFieldReader<Item: FastValue>: Clone {
@@ -42,9 +48,9 @@ pub trait FastFieldReader<Item: FastValue>: Clone {
/// Returns the minimum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual mimimum value.
fn min_value(&self) -> Item;
/// Returns the maximum value for this fast field.
@@ -61,15 +67,48 @@ pub trait FastFieldReader<Item: FastValue>: Clone {
///
pub enum DynamicFastFieldReader<Item: FastValue> {
/// Bitpacked compressed fastfield data.
Bitpacked(BitpackedFastFieldReader<Item>),
Bitpacked(FastFieldReaderCodecWrapper<Item, BitpackedReader>),
/// Linear interpolated values + bitpacked
LinearInterpol(FastFieldReaderCodecWrapper<Item, LinearInterpolFastFieldReader>),
/// Blockwise linear interpolated values + bitpacked
MultiLinearInterpol(FastFieldReaderCodecWrapper<Item, MultiLinearInterpolFastFieldReader>),
}
impl<Item: FastValue> DynamicFastFieldReader<Item> {
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
Ok(DynamicFastFieldReader::Bitpacked(
BitpackedFastFieldReader::open(file)?,
))
let mut bytes = file.read_bytes()?;
let id = bytes.read_u8();
let reader = match id {
BitpackedFastFieldSerializer::ID => {
DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::<
Item,
BitpackedReader,
>::open_from_bytes(bytes)?)
}
LinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::LinearInterpol(FastFieldReaderCodecWrapper::<
Item,
LinearInterpolFastFieldReader,
>::open_from_bytes(bytes)?)
}
MultiLinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::MultiLinearInterpol(FastFieldReaderCodecWrapper::<
Item,
MultiLinearInterpolFastFieldReader,
>::open_from_bytes(
bytes
)?)
}
_ => {
panic!(
"unknown fastfield id {:?}. Data corrupted or using old tantivy version.",
id
)
}
};
Ok(reader)
}
}
@@ -77,57 +116,67 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
fn get(&self, doc: DocId) -> Item {
match self {
Self::Bitpacked(reader) => reader.get(doc),
Self::LinearInterpol(reader) => reader.get(doc),
Self::MultiLinearInterpol(reader) => reader.get(doc),
}
}
fn get_range(&self, start: DocId, output: &mut [Item]) {
match self {
Self::Bitpacked(reader) => reader.get_range(start, output),
Self::LinearInterpol(reader) => reader.get_range(start, output),
Self::MultiLinearInterpol(reader) => reader.get_range(start, output),
}
}
fn min_value(&self) -> Item {
match self {
Self::Bitpacked(reader) => reader.min_value(),
Self::LinearInterpol(reader) => reader.min_value(),
Self::MultiLinearInterpol(reader) => reader.min_value(),
}
}
fn max_value(&self) -> Item {
match self {
Self::Bitpacked(reader) => reader.max_value(),
Self::LinearInterpol(reader) => reader.max_value(),
Self::MultiLinearInterpol(reader) => reader.max_value(),
}
}
}
/// Trait for accessing a fastfield.
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
///
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct BitpackedFastFieldReader<Item: FastValue> {
pub struct FastFieldReaderCodecWrapper<Item: FastValue, CodecReader> {
reader: CodecReader,
bytes: OwnedBytes,
bit_unpacker: BitUnpacker,
min_value_u64: u64,
max_value_u64: u64,
_phantom: PhantomData<Item>,
}
impl<Item: FastValue> BitpackedFastFieldReader<Item> {
impl<Item: FastValue, C: FastFieldCodecReader> FastFieldReaderCodecWrapper<Item, C> {
/// Opens a fast field given a file.
pub fn open(file: FileSlice) -> crate::Result<Self> {
let mut bytes = file.read_bytes()?;
let min_value = u64::deserialize(&mut bytes)?;
let amplitude = u64::deserialize(&mut bytes)?;
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedFastFieldReader {
let id = u8::deserialize(&mut bytes)?;
assert_eq!(
BitpackedFastFieldSerializer::ID,
id,
"Tried to open fast field as bitpacked encoded (id=1), but got serializer with different id"
);
Self::open_from_bytes(bytes)
}
/// Opens a fast field given the bytes.
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
let reader = C::open_from_bytes(bytes.as_slice())?;
Ok(FastFieldReaderCodecWrapper {
reader,
bytes,
min_value_u64: min_value,
max_value_u64: max_value,
bit_unpacker,
_phantom: PhantomData,
})
}
pub(crate) fn get_u64(&self, doc: u64) -> Item {
Item::from_u64(self.min_value_u64 + self.bit_unpacker.get(doc, &self.bytes))
Item::from_u64(self.reader.get_u64(doc, self.bytes.as_slice()))
}
/// Internally `multivalued` also use SingleValue Fast fields.
@@ -149,7 +198,9 @@ impl<Item: FastValue> BitpackedFastFieldReader<Item> {
}
}
impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
impl<Item: FastValue, C: FastFieldCodecReader + Clone> FastFieldReader<Item>
for FastFieldReaderCodecWrapper<Item, C>
{
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
@@ -185,7 +236,7 @@ impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn min_value(&self) -> Item {
Item::from_u64(self.min_value_u64)
Item::from_u64(self.reader.min_value())
}
/// Returns the maximum value for this fast field.
@@ -194,12 +245,14 @@ impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn max_value(&self) -> Item {
Item::from_u64(self.max_value_u64)
Item::from_u64(self.reader.max_value())
}
}
impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> BitpackedFastFieldReader<Item> {
pub(crate) type BitpackedFastFieldReader<Item> = FastFieldReaderCodecWrapper<Item, BitpackedReader>;
impl<Item: FastValue> From<Vec<Item>> for DynamicFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> DynamicFastFieldReader<Item> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field("field", FAST);
let schema = schema_builder.build();
@@ -231,6 +284,6 @@ impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
let field_file = composite_file
.open_read(field)
.expect("File component not found");
BitpackedFastFieldReader::open(field_file).unwrap()
DynamicFastFieldReader::open(field_file).unwrap()
}
}

View File

@@ -8,7 +8,6 @@ use crate::space_usage::PerFieldSpaceUsage;
use crate::TantivyError;
use super::reader::DynamicFastFieldReader;
use super::FastFieldReader;
/// Provides access to all of the BitpackedFastFieldReader.
///
@@ -112,9 +111,8 @@ impl FastFieldReaders {
&self,
field: Field,
) -> crate::Result<MultiValuedFastFieldReader<TFastValue>> {
let fast_field_slice_idx = self.fast_field_data(field, 0)?;
let idx_reader = self.typed_fast_field_reader(field)?;
let fast_field_slice_vals = self.fast_field_data(field, 1)?;
let idx_reader = BitpackedFastFieldReader::open(fast_field_slice_idx)?;
let vals_reader: BitpackedFastFieldReader<TFastValue> =
BitpackedFastFieldReader::open(fast_field_slice_vals)?;
Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader))
@@ -139,7 +137,7 @@ impl FastFieldReaders {
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns an Error.
pub fn i64(&self, field: Field) -> crate::Result<impl FastFieldReader<i64>> {
pub fn i64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<i64>> {
self.check_type(field, FastType::I64, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -147,7 +145,7 @@ impl FastFieldReaders {
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns an Error.
pub fn date(&self, field: Field) -> crate::Result<impl FastFieldReader<crate::DateTime>> {
pub fn date(&self, field: Field) -> crate::Result<DynamicFastFieldReader<crate::DateTime>> {
self.check_type(field, FastType::Date, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -155,7 +153,7 @@ impl FastFieldReaders {
/// Returns the `f64` fast field reader reader associated to `field`.
///
/// If `field` is not a f64 fast field, this method returns an Error.
pub fn f64(&self, field: Field) -> crate::Result<impl FastFieldReader<f64>> {
pub fn f64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<f64>> {
self.check_type(field, FastType::F64, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}

View File

@@ -1,256 +0,0 @@
use crate::common::BinarySerializable;
use crate::common::CompositeWrite;
use crate::common::CountingWriter;
use crate::directory::WritePtr;
use crate::schema::Field;
use std::io::{self, Write};
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
/// `CompositeFastFieldSerializer` is in charge of serializing
/// fastfields on disk.
///
/// Fast fields have differnt encodings like bit-packing.
///
/// `FastFieldWriter`s are in charge of pushing the data to
/// the serializer.
/// The serializer expects to receive the following calls.
///
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer { composite_write })
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field_with_idx(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
idx: usize,
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
DynamicFastFieldSerializer::open(field_write, min_value, max_value)
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
&mut self,
field: Field,
idx: usize,
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
FastBytesFieldSerializer { write: field_write }
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(self) -> io::Result<()> {
self.composite_write.close()
}
}
#[derive(Debug, Clone)]
pub struct EstimationStats {
min_value: u64,
max_value: u64,
}
/// The FastFieldSerializer trait is the common interface
/// implemented by every fastfield serializer variant.
///
/// `DynamicFastFieldSerializer` is the enum wrapping all variants.
/// It is used to create an serializer instance.
pub trait FastFieldSerializer {
/// add value to serializer
fn add_val(&mut self, val: u64) -> io::Result<()>;
/// finish serializing a field.
fn close_field(self) -> io::Result<()>;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldSerializerEstimate {
/// returns an estimate of the compression ratio.
fn estimate(
/*fastfield_accessor: impl FastFieldReader<u64>,*/ stats: EstimationStats,
) -> (f32, &'static str);
/// the unique name of the compressor
fn name() -> &'static str;
}
pub enum DynamicFastFieldSerializer<'a, W: Write> {
Bitpacked(BitpackedFastFieldSerializer<'a, W>),
}
impl<'a, W: Write> DynamicFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
let stats = EstimationStats {
min_value,
max_value,
};
let (_ratio, name) = (
BitpackedFastFieldSerializer::<Vec<u8>>::estimate(stats),
BitpackedFastFieldSerializer::<Vec<u8>>::name(),
);
Self::open_from_name(write, min_value, max_value, name)
}
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open_from_name(
write: &'a mut W,
min_value: u64,
max_value: u64,
name: &str,
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
// Weirdly the W generic on BitpackedFastFieldSerializer needs to be set,
// although name() doesn't use it
let variant = if name == BitpackedFastFieldSerializer::<Vec<u8>>::name() {
DynamicFastFieldSerializer::Bitpacked(BitpackedFastFieldSerializer::open(
write, min_value, max_value,
)?)
} else {
panic!("unknown fastfield serializer {}", name);
};
Ok(variant)
}
}
impl<'a, W: Write> FastFieldSerializer for DynamicFastFieldSerializer<'a, W> {
fn add_val(&mut self, val: u64) -> io::Result<()> {
match self {
Self::Bitpacked(serializer) => serializer.add_val(val),
}
}
fn close_field(self) -> io::Result<()> {
match self {
Self::Bitpacked(serializer) => serializer.close_field(),
}
}
}
pub struct BitpackedFastFieldSerializer<'a, W: Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
num_bits: u8,
}
impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializer<'a, W>> {
assert!(min_value <= max_value);
min_value.serialize(write)?;
let amplitude = max_value - min_value;
amplitude.serialize(write)?;
let num_bits = compute_num_bits(amplitude);
let bit_packer = BitPacker::new();
Ok(BitpackedFastFieldSerializer {
bit_packer,
write,
min_value,
num_bits,
})
}
}
impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> {
/// Pushes a new value to the currently open u64 fast field.
fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
Ok(())
}
fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)
}
}
impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> {
fn estimate(
/*_fastfield_accessor: impl FastFieldReader<u64>, */ stats: EstimationStats,
) -> (f32, &'static str) {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
let ratio = num_bits as f32 / num_bits_uncompressed as f32;
let name = Self::name();
(ratio, name)
}
fn name() -> &'static str {
"Bitpacked"
}
}
pub struct FastBytesFieldSerializer<'a, W: Write> {
write: &'a mut W,
}
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
self.write.write_all(vals)
}
pub fn flush(&mut self) -> io::Result<()> {
self.write.flush()
}
}

View File

@@ -0,0 +1,203 @@
use crate::common::BinarySerializable;
use crate::common::CompositeWrite;
use crate::common::CountingWriter;
use crate::directory::WritePtr;
use crate::schema::Field;
pub use fastfield_codecs::bitpacked::BitpackedFastFieldSerializer;
pub use fastfield_codecs::bitpacked::BitpackedFastFieldSerializerLegacy;
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
pub use fastfield_codecs::FastFieldCodecSerializer;
pub use fastfield_codecs::FastFieldDataAccess;
pub use fastfield_codecs::FastFieldStats;
use std::io::{self, Write};
/// `CompositeFastFieldSerializer` is in charge of serializing
/// fastfields on disk.
///
/// Fast fields have different encodings like bit-packing.
///
/// `FastFieldWriter`s are in charge of pushing the data to
/// the serializer.
/// The serializer expects to receive the following calls.
///
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
}
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
// https://github.com/rust-lang/rust/pull/86176
fn codec_estimation<T: FastFieldCodecSerializer, A: FastFieldDataAccess>(
stats: FastFieldStats,
fastfield_accessor: &A,
estimations: &mut Vec<(f32, &str, u8)>,
) {
if !T::is_applicable(fastfield_accessor, stats.clone()) {
return;
}
let (ratio, name, id) = (
T::estimate(fastfield_accessor, stats.clone()),
T::NAME,
T::ID,
);
estimations.push((ratio, name, id));
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer { composite_write })
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen automatically.
pub fn create_auto_detect_u64_fast_field(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
data_iter_1: impl Iterator<Item = u64>,
data_iter_2: impl Iterator<Item = u64>,
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, 0);
let mut estimations = vec![];
codec_estimation::<BitpackedFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
codec_estimation::<LinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
codec_estimation::<MultiLinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
if let Some(broken_estimation) = estimations
.iter()
.find(|estimation| estimation.0 == f32::NAN)
{
warn!(
"broken estimation for fast field codec {}",
broken_estimation.1
);
}
// removing nan values for codecs with broken calculations, and max values which disables codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, name, id) = estimations[0];
debug!(
"choosing fast field codec {} for field_id {:?}",
name, field
); // todo print actual field name
id.serialize(field_write)?;
match name {
BitpackedFastFieldSerializer::NAME => {
BitpackedFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
)?;
}
LinearInterpolFastFieldSerializer::NAME => {
LinearInterpolFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
)?;
}
MultiLinearInterpolFastFieldSerializer::NAME => {
MultiLinearInterpolFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
)?;
}
_ => {
panic!("unknown fastfield serializer {}", name)
}
};
field_write.flush()?;
Ok(())
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field_with_idx(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
idx: usize,
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
// Prepend codec id to field data for compatibility with DynamicFastFieldReader.
let id = BitpackedFastFieldSerializer::ID;
id.serialize(field_write)?;
BitpackedFastFieldSerializerLegacy::open(field_write, min_value, max_value)
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
&mut self,
field: Field,
idx: usize,
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
FastBytesFieldSerializer { write: field_write }
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(self) -> io::Result<()> {
self.composite_write.close()
}
}
pub struct FastBytesFieldSerializer<'a, W: Write> {
write: &'a mut W,
}
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
self.write.write_all(vals)
}
pub fn flush(&mut self) -> io::Result<()> {
self.write.flush()
}
}

View File

@@ -1,11 +1,13 @@
use super::multivalued::MultiValuedFastFieldWriter;
use super::serializer::FastFieldStats;
use super::FastFieldDataAccess;
use crate::common;
use crate::fastfield::serializer::FastFieldSerializer;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema};
use crate::termdict::TermOrdinal;
use crate::DocId;
use fnv::FnvHashMap;
use std::collections::HashMap;
use std::io;
@@ -265,9 +267,9 @@ impl IntFastFieldWriter {
self.add_val(val);
}
/// Extract the stored data
pub(crate) fn get_data(&self) -> Vec<u64> {
self.vals.iter().collect::<Vec<u64>>()
/// get iterator over the data
pub(crate) fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.vals.iter()
}
/// Push the fast fields value to the `FastFieldWriter`.
@@ -281,17 +283,58 @@ impl IntFastFieldWriter {
} else {
(self.val_min, self.val_max)
};
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
if let Some(doc_id_map) = doc_id_map {
for doc_id in doc_id_map.iter_old_doc_ids() {
single_field_serializer.add_val(self.vals.get(*doc_id as usize))?;
}
} else {
for val in self.vals.iter() {
single_field_serializer.add_val(val)?;
}
let fastfield_accessor = WriterFastFieldAccessProvider {
doc_id_map,
vals: &self.vals,
};
let stats = FastFieldStats {
min_value: min,
max_value: max,
num_vals: self.val_count as u64,
};
single_field_serializer.close_field()
if let Some(doc_id_map) = doc_id_map {
let iter = doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(*doc_id as usize));
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter.clone(),
iter,
)?;
} else {
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
self.vals.iter(),
self.vals.iter(),
)?;
};
Ok(())
}
}
#[derive(Clone)]
struct WriterFastFieldAccessProvider<'map, 'bitp> {
doc_id_map: Option<&'map DocIdMapping>,
vals: &'bitp BlockedBitpacker,
}
impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> {
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
fn get(&self, doc: DocId) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals.get(doc_id_map.get_old_doc_id(doc) as usize) // consider extra FastFieldReader wrapper for non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
}

View File

@@ -60,9 +60,8 @@ pub(crate) fn get_doc_id_mapping_from_field(
})?;
// create new doc_id to old doc_id index (used in fast_field_writers)
let data = fast_field.get_data();
let mut doc_id_and_data = data
.into_iter()
let mut doc_id_and_data = fast_field
.iter()
.enumerate()
.map(|el| (el.0 as DocId, el.1))
.collect::<Vec<_>>();

View File

@@ -3,8 +3,9 @@ use crate::error::DataCorruption;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::DynamicFastFieldReader;
use crate::fastfield::FastFieldDataAccess;
use crate::fastfield::FastFieldReader;
use crate::fastfield::FastFieldSerializer;
use crate::fastfield::FastFieldStats;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::fieldnorm::FieldNormsSerializer;
use crate::fieldnorm::FieldNormsWriter;
@@ -344,21 +345,38 @@ impl IndexMerger {
})
.collect::<Vec<_>>();
if let Some(doc_id_mapping) = doc_id_mapping {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
(
doc_id,
&fast_field_readers[reader_with_ordinal.ordinal as usize],
)
});
// add values in order of the new doc_ids
let mut fast_single_field_serializer =
fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?;
for (doc_id, field_reader) in sorted_doc_ids {
let val = field_reader.get(*doc_id);
fast_single_field_serializer.add_val(val)?;
#[derive(Clone)]
struct SortedDocidFieldAccessProvider<'a> {
doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
}
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
fn get(&self, doc: DocId) -> u64 {
let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id)
}
}
let stats = FastFieldStats {
min_value,
max_value,
num_vals: doc_id_mapping.len() as u64,
};
let fastfield_accessor = SortedDocidFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
fast_field_reader.get(*doc_id)
});
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter.clone(),
iter,
)?;
fast_single_field_serializer.close_field()?;
Ok(())
} else {
let u64_readers = self.readers.iter()
@@ -496,19 +514,21 @@ impl IndexMerger {
// Important: reader_and_field_accessor needs
// to have the same order as self.readers since ReaderWithOrdinal
// is used to index the reader_and_field_accessors vec.
fn write_1_n_fast_field_idx_generic(
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
reader_and_field_accessors: &[(&SegmentReader, impl MultiValueLength)],
reader_and_field_accessors: &[(&SegmentReader, T)],
) -> crate::Result<()> {
let mut total_num_vals = 0u64;
// In the first pass, we compute the total number of vals.
//
// This is required by the bitpacker, as it needs to know
// what should be the bit length use for bitpacking.
let mut idx_num_vals = 0;
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
if let Some(delete_bitset) = reader.delete_bitset() {
idx_num_vals += reader.max_doc() as u64 - delete_bitset.len() as u64;
for doc in 0u32..reader.max_doc() {
if delete_bitset.is_alive(doc) {
let num_vals = u64s_reader.get_len(doc) as u64;
@@ -516,38 +536,60 @@ impl IndexMerger {
}
}
} else {
idx_num_vals += reader.max_doc() as u64;
total_num_vals += u64s_reader.get_total_len();
}
}
let stats = FastFieldStats {
max_value: total_num_vals,
num_vals: idx_num_vals,
min_value: 0,
};
// We can now create our `idx` serializer, and in a second pass,
// can effectively push the different indexes.
if let Some(doc_id_mapping) = doc_id_mapping {
let mut serialize_idx =
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
// copying into a temp vec is not ideal, but the fast field codec api requires random
// access, which is used in the estimation. It's possible to 1. calculate random
// acccess on the fly or 2. change the codec api to make random access optional, but
// they both have also major drawbacks.
let mut offsets = vec![];
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping {
let reader = &reader_and_field_accessors[reader.ordinal as usize].1;
serialize_idx.add_val(offset)?;
offsets.push(offset);
offset += reader.get_len(*doc_id) as u64;
}
serialize_idx.add_val(offset as u64)?;
offsets.push(offset);
serialize_idx.close_field()?;
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
&offsets,
offsets.iter().cloned(),
offsets.iter().cloned(),
)?;
} else {
let mut serialize_idx =
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
let mut idx = 0;
let mut offsets = vec![];
let mut offset = 0;
for (segment_reader, u64s_reader) in reader_and_field_accessors.iter() {
for doc in segment_reader.doc_ids_alive() {
serialize_idx.add_val(idx)?;
idx += u64s_reader.get_len(doc) as u64;
offsets.push(offset);
offset += u64s_reader.get_len(doc) as u64;
}
}
serialize_idx.add_val(idx)?;
serialize_idx.close_field()?;
offsets.push(offset);
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
&offsets,
offsets.iter().cloned(),
offsets.iter().cloned(),
)?;
}
Ok(())
}
fn write_multi_value_fast_field_idx(

View File

@@ -362,6 +362,7 @@ mod bench_sorted_index_merge {
use crate::core::Index;
//use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema;
use crate::fastfield::DynamicFastFieldReader;
use crate::fastfield::FastFieldReader;
use crate::indexer::merger::IndexMerger;
use crate::schema::Cardinality;
@@ -422,7 +423,7 @@ mod bench_sorted_index_merge {
b.iter(|| {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{
let u64_reader: FastFieldReader<u64> = reader.reader
let u64_reader: DynamicFastFieldReader<u64> = reader.reader
.fast_fields()
.typed_fast_field_reader(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");

View File

@@ -354,18 +354,18 @@ mod bench {
});
}
#[test]
fn test_all_docs_compression_numbits() {
for expected_num_bits in 0u8.. {
let mut data = [0u32; 128];
if expected_num_bits > 0 {
data[0] = (1u64 << (expected_num_bits as usize) - 1) as u32;
}
let mut encoder = BlockEncoder::new();
let (num_bits, compressed) = encoder.compress_block_unsorted(&data);
assert_eq!(compressed.len(), compressed_block_size(num_bits));
}
}
//#[test]
//fn test_all_docs_compression_numbits() {
//for expected_num_bits in 0u8.. {
//let mut data = [0u32; 128];
//if expected_num_bits > 0 {
//data[0] = (1u64 << (expected_num_bits as usize) - 1) as u32;
//}
//let mut encoder = BlockEncoder::new();
//let (num_bits, compressed) = encoder.compress_block_unsorted(&data);
//assert_eq!(compressed.len(), compressed_block_size(num_bits));
//}
//}
const NUM_INTS_BENCH_VINT: usize = 10;

View File

@@ -23,6 +23,15 @@ pub struct FieldEntry {
}
impl FieldEntry {
/// Creates a new field entry given a name and a field type
pub fn new(field_name: String, field_type: FieldType) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type,
}
}
/// Creates a new u64 field entry in the schema, given
/// a name, and some options.
pub fn new_text(field_name: String, text_options: TextOptions) -> FieldEntry {

View File

@@ -133,12 +133,22 @@ pub mod tests {
format!("Doc {}", i)
);
}
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
let doc = doc?;
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
if !title_content.starts_with("Doc ") {
panic!("unexpected title_content {}", title_content);
}
let id = title_content
.strip_prefix("Doc ")
.unwrap()
.parse::<u32>()
.unwrap();
if delete_bitset.is_deleted(id) {
panic!("unexpected deleted document {}", id);
}
}
Ok(())

View File

@@ -171,11 +171,6 @@ impl StoreReader {
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
// the outer variables
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if !alive {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
}
// check move to next checkpoint
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
@@ -187,6 +182,7 @@ impl StoreReader {
num_skipped = 0;
}
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
@@ -194,6 +190,8 @@ impl StoreReader {
reset_block_pos = false;
ret
} else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None
}
})