mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Compare commits
45 Commits
0.15.1
...
expose-min
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c454d752d | ||
|
|
f05e84f964 | ||
|
|
65546ed22b | ||
|
|
57ae5b27dc | ||
|
|
f9531ec3c9 | ||
|
|
5b54a32563 | ||
|
|
cd049e28bc | ||
|
|
646e41bec4 | ||
|
|
36528c5e83 | ||
|
|
cd169dee23 | ||
|
|
b5cc60f80b | ||
|
|
060b83159a | ||
|
|
a40ff35453 | ||
|
|
268e6bfe6e | ||
|
|
f902440b8b | ||
|
|
77a0902605 | ||
|
|
c889ae10e4 | ||
|
|
0a534c6ee0 | ||
|
|
167d88b449 | ||
|
|
1071ed84f2 | ||
|
|
abb5624af2 | ||
|
|
1d41b96d32 | ||
|
|
ef4665945f | ||
|
|
294cd5fd0b | ||
|
|
f4d271177c | ||
|
|
451538fecf | ||
|
|
e78e0fec59 | ||
|
|
2e639cebf8 | ||
|
|
e296da7ade | ||
|
|
3b3e26c4b8 | ||
|
|
6a4883ac69 | ||
|
|
0ba05df545 | ||
|
|
aa3c4d4029 | ||
|
|
60df629725 | ||
|
|
2570b005ac | ||
|
|
d5212cd19d | ||
|
|
2193d85622 | ||
|
|
dfdbfe9eff | ||
|
|
b999e836b2 | ||
|
|
be2dd41e69 | ||
|
|
483fdb79cc | ||
|
|
aefd0fc907 | ||
|
|
3298d6cb71 | ||
|
|
c02c78ea73 | ||
|
|
6bf4fee1ba |
8
.github/workflows/test.yml
vendored
8
.github/workflows/test.yml
vendored
@@ -18,7 +18,13 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Build
|
||||
run: cargo build --verbose --workspace
|
||||
- name: Install latest nightly to test also against unstable feature flag
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly
|
||||
override: true
|
||||
components: rustfmt
|
||||
- name: Run tests
|
||||
run: cargo test --verbose --workspace
|
||||
run: cargo test --all-features --verbose --workspace
|
||||
- name: Check Formatting
|
||||
run: cargo fmt --all -- --check
|
||||
|
||||
@@ -35,6 +35,8 @@ crossbeam = "0.8"
|
||||
futures = { version = "0.3.15", features = ["thread-pool"] }
|
||||
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
|
||||
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
|
||||
common = { version="0.1", path="./common" }
|
||||
fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false }
|
||||
stable_deref_trait = "1.2"
|
||||
rust-stemmers = "1.2"
|
||||
downcast-rs = "1.2"
|
||||
@@ -88,7 +90,7 @@ unstable = [] # useful for benches.
|
||||
wasm-bindgen = ["uuid/wasm-bindgen"]
|
||||
|
||||
[workspace]
|
||||
members = ["query-grammar", "bitpacker"]
|
||||
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs"]
|
||||
|
||||
[badges]
|
||||
travis-ci = { repository = "tantivy-search/tantivy" }
|
||||
|
||||
@@ -49,6 +49,7 @@ impl BitPacker {
|
||||
let bytes = self.mini_buffer.to_le_bytes();
|
||||
output.write_all(&bytes[..num_bytes])?;
|
||||
self.mini_buffer_written = 0;
|
||||
self.mini_buffer = 0;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -61,7 +62,7 @@ impl BitPacker {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct BitUnpacker {
|
||||
num_bits: u64,
|
||||
mask: u64,
|
||||
|
||||
12
common/Cargo.toml
Normal file
12
common/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "common"
|
||||
version = "0.1.0"
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2018"
|
||||
description = "common traits and utility functions used by multiple tantivy subcrates"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.4.3"
|
||||
9
common/src/lib.rs
Normal file
9
common/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
pub use byteorder::LittleEndian as Endianness;
|
||||
|
||||
mod serialize;
|
||||
mod vint;
|
||||
mod writer;
|
||||
|
||||
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt};
|
||||
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::common::Endianness;
|
||||
use crate::common::VInt;
|
||||
use crate::Endianness;
|
||||
use crate::VInt;
|
||||
use byteorder::{ReadBytesExt, WriteBytesExt};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@@ -160,6 +160,28 @@ impl FixedSize for u8 {
|
||||
const SIZE_IN_BYTES: usize = 1;
|
||||
}
|
||||
|
||||
impl BinarySerializable for bool {
|
||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let val = if *self { 1 } else { 0 };
|
||||
writer.write_u8(val)
|
||||
}
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<bool> {
|
||||
let val = reader.read_u8()?;
|
||||
match val {
|
||||
0 => Ok(false),
|
||||
1 => Ok(true),
|
||||
_ => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"invalid bool value on deserialization, data corrupted",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for bool {
|
||||
const SIZE_IN_BYTES: usize = 1;
|
||||
}
|
||||
|
||||
impl BinarySerializable for String {
|
||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let data: &[u8] = self.as_bytes();
|
||||
@@ -180,9 +202,9 @@ impl BinarySerializable for String {
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
|
||||
use super::VInt;
|
||||
use super::*;
|
||||
use crate::common::VInt;
|
||||
|
||||
use crate::serialize::BinarySerializable;
|
||||
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
|
||||
let mut buffer = Vec::new();
|
||||
O::default().serialize(&mut buffer).unwrap();
|
||||
@@ -175,8 +175,8 @@ impl BinarySerializable for VInt {
|
||||
mod tests {
|
||||
|
||||
use super::serialize_vint_u32;
|
||||
use super::BinarySerializable;
|
||||
use super::VInt;
|
||||
use crate::common::BinarySerializable;
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
let mut v = [14u8; 10];
|
||||
@@ -1,7 +1,4 @@
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::TerminatingWrite;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::io::{self, BufWriter, Write};
|
||||
|
||||
pub struct CountingWriter<W> {
|
||||
underlying: W,
|
||||
@@ -16,41 +13,87 @@ impl<W: Write> CountingWriter<W> {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn written_bytes(&self) -> u64 {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
/// Returns the underlying write object.
|
||||
/// Note that this method does not trigger any flushing.
|
||||
#[inline]
|
||||
pub fn finish(self) -> W {
|
||||
self.underlying
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write> Write for CountingWriter<W> {
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let written_size = self.underlying.write(buf)?;
|
||||
self.written_bytes += written_size as u64;
|
||||
Ok(written_size)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
|
||||
self.underlying.write_all(buf)?;
|
||||
self.written_bytes += buf.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.underlying.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
#[inline]
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.underlying.terminate_ref(token)
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
|
||||
///
|
||||
/// The point is that while the type is public, it cannot be built by anyone
|
||||
/// outside of this module.
|
||||
pub struct AntiCallToken(());
|
||||
|
||||
/// Trait used to indicate when no more write need to be done on a writer
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
self.terminate_ref(AntiCallToken(()))
|
||||
}
|
||||
|
||||
/// You should implement this function to define custom behavior.
|
||||
/// This function should flush any buffer it may hold.
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.as_mut().terminate_ref(token)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
||||
fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.get_mut().terminate_ref(a)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
25
fastfield_codecs/Cargo.toml
Normal file
25
fastfield_codecs/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "fastfield_codecs"
|
||||
version = "0.1.0"
|
||||
authors = ["Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2018"
|
||||
description = "Fast field codecs used by tantivy"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
common = { path = "../common/" }
|
||||
tantivy-bitpacker = { path = "../bitpacker/" }
|
||||
prettytable-rs = {version="0.8.0", optional= true}
|
||||
#prettytable-rs = {version="0.8.0" }
|
||||
rand = "0.8.3"
|
||||
|
||||
[dev-dependencies]
|
||||
more-asserts = "0.2.1"
|
||||
rand = "0.8.3"
|
||||
|
||||
[features]
|
||||
bin = ["prettytable-rs"]
|
||||
default = ["bin"]
|
||||
|
||||
68
fastfield_codecs/README.md
Normal file
68
fastfield_codecs/README.md
Normal file
@@ -0,0 +1,68 @@
|
||||
|
||||
|
||||
# Fast Field Codecs
|
||||
|
||||
This crate contains various fast field codecs, used to compress/decompress fast field data in tantivy.
|
||||
|
||||
## Contributing
|
||||
|
||||
Contributing is pretty straightforward. Since the bitpacking is the simplest compressor, you can check it for reference.
|
||||
|
||||
A codec needs to implement 2 traits:
|
||||
|
||||
- A reader implementing `FastFieldCodecReader` to read the codec.
|
||||
- A serializer implementing `FastFieldCodecSerializer` for compression estimation and codec name + id.
|
||||
|
||||
### Tests
|
||||
|
||||
Once the traits are implemented test and benchmark integration is pretty easy (see `test_with_codec_data_sets` and `bench.rs`).
|
||||
|
||||
Make sure to add the codec to the main.rs, which tests the compression ratio and estimation against different data sets. You can run it with:
|
||||
```
|
||||
cargo run --features bin
|
||||
```
|
||||
|
||||
### TODO
|
||||
- Add real world data sets in comparison
|
||||
- Add codec to cover sparse data sets
|
||||
|
||||
|
||||
### Codec Comparison
|
||||
```
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| | Compression Ratio | Compression Estimation |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Autoincrement | | |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| LinearInterpol | 0.000039572664 | 0.000004396963 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| MultiLinearInterpol | 0.1477348 | 0.17275847 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Bitpacked | 0.28126493 | 0.28125 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Monotonically increasing concave | | |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| LinearInterpol | 0.25003937 | 0.26562938 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| MultiLinearInterpol | 0.190665 | 0.1883836 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Bitpacked | 0.31251436 | 0.3125 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Monotonically increasing convex | | |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| LinearInterpol | 0.25003937 | 0.28125438 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| MultiLinearInterpol | 0.18676 | 0.2040086 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Bitpacked | 0.31251436 | 0.3125 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Almost monotonically increasing | | |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| LinearInterpol | 0.14066513 | 0.1562544 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| MultiLinearInterpol | 0.16335973 | 0.17275847 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
| Bitpacked | 0.28126493 | 0.28125 |
|
||||
+----------------------------------+-------------------+------------------------+
|
||||
|
||||
```
|
||||
109
fastfield_codecs/benches/bench.rs
Normal file
109
fastfield_codecs/benches/bench.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
#![feature(test)]
|
||||
|
||||
extern crate test;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fastfield_codecs::{
|
||||
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
|
||||
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
|
||||
multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
},
|
||||
*,
|
||||
};
|
||||
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut data: Vec<_> = (100..55000_u64)
|
||||
.map(|num| num + rand::random::<u8>() as u64)
|
||||
.collect();
|
||||
data.push(99_000);
|
||||
data.insert(1000, 2000);
|
||||
data.insert(2000, 100);
|
||||
data.insert(3000, 4100);
|
||||
data.insert(4000, 100);
|
||||
data.insert(5000, 800);
|
||||
data
|
||||
}
|
||||
|
||||
fn value_iter() -> impl Iterator<Item = u64> {
|
||||
let data = (0..20_000).collect::<Vec<_>>();
|
||||
data.into_iter()
|
||||
}
|
||||
fn bench_get<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
|
||||
b: &mut Bencher,
|
||||
data: &[u64],
|
||||
) {
|
||||
let mut bytes = vec![];
|
||||
S::serialize(
|
||||
&mut bytes,
|
||||
&data,
|
||||
stats_from_vec(&data),
|
||||
data.iter().cloned(),
|
||||
data.iter().cloned(),
|
||||
)
|
||||
.unwrap();
|
||||
let reader = R::open_from_bytes(&bytes).unwrap();
|
||||
b.iter(|| {
|
||||
for pos in value_iter() {
|
||||
reader.get_u64(pos as u64, &bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
fn bench_create<S: FastFieldCodecSerializer>(b: &mut Bencher, data: &[u64]) {
|
||||
let mut bytes = vec![];
|
||||
b.iter(|| {
|
||||
S::serialize(
|
||||
&mut bytes,
|
||||
&data,
|
||||
stats_from_vec(&data),
|
||||
data.iter().cloned(),
|
||||
data.iter().cloned(),
|
||||
)
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
use test::Bencher;
|
||||
#[bench]
|
||||
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<BitpackedFastFieldSerializer>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_linearinterpol_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<LinearInterpolFastFieldSerializer>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_multilinearinterpol_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<MultiLinearInterpolFastFieldSerializer>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_bitpack_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>(
|
||||
b, &data,
|
||||
);
|
||||
}
|
||||
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
|
||||
let min_value = data.iter().cloned().min().unwrap_or(0);
|
||||
let max_value = data.iter().cloned().max().unwrap_or(0);
|
||||
FastFieldStats {
|
||||
min_value,
|
||||
max_value,
|
||||
num_vals: data.len() as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
176
fastfield_codecs/src/bitpacked.rs
Normal file
176
fastfield_codecs/src/bitpacked.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use common::BinarySerializable;
|
||||
use std::io::{self, Write};
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
#[derive(Clone)]
|
||||
pub struct BitpackedFastFieldReader {
|
||||
bit_unpacker: BitUnpacker,
|
||||
pub min_value_u64: u64,
|
||||
pub max_value_u64: u64,
|
||||
}
|
||||
|
||||
impl<'data> FastFieldCodecReader for BitpackedFastFieldReader {
|
||||
/// Opens a fast field given a file.
|
||||
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
|
||||
let (_data, mut footer) = bytes.split_at(bytes.len() - 16);
|
||||
let min_value = u64::deserialize(&mut footer)?;
|
||||
let amplitude = u64::deserialize(&mut footer)?;
|
||||
let max_value = min_value + amplitude;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let bit_unpacker = BitUnpacker::new(num_bits);
|
||||
Ok(BitpackedFastFieldReader {
|
||||
min_value_u64: min_value,
|
||||
max_value_u64: max_value,
|
||||
bit_unpacker,
|
||||
})
|
||||
}
|
||||
#[inline]
|
||||
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
|
||||
self.min_value_u64 + self.bit_unpacker.get(doc, &data)
|
||||
}
|
||||
#[inline]
|
||||
fn min_value(&self) -> u64 {
|
||||
self.min_value_u64
|
||||
}
|
||||
#[inline]
|
||||
fn max_value(&self) -> u64 {
|
||||
self.max_value_u64
|
||||
}
|
||||
}
|
||||
pub struct BitpackedFastFieldSerializerLegacy<'a, W: 'a + Write> {
|
||||
bit_packer: BitPacker,
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
amplitude: u64,
|
||||
num_bits: u8,
|
||||
}
|
||||
|
||||
impl<'a, W: Write> BitpackedFastFieldSerializerLegacy<'a, W> {
|
||||
/// Creates a new fast field serializer.
|
||||
///
|
||||
/// The serializer in fact encode the values by bitpacking
|
||||
/// `(val - min_value)`.
|
||||
///
|
||||
/// It requires a `min_value` and a `max_value` to compute
|
||||
/// compute the minimum number of bits required to encode
|
||||
/// values.
|
||||
pub fn open(
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
) -> io::Result<BitpackedFastFieldSerializerLegacy<'a, W>> {
|
||||
assert!(min_value <= max_value);
|
||||
let amplitude = max_value - min_value;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let bit_packer = BitPacker::new();
|
||||
Ok(BitpackedFastFieldSerializerLegacy {
|
||||
bit_packer,
|
||||
write,
|
||||
min_value,
|
||||
amplitude,
|
||||
num_bits,
|
||||
})
|
||||
}
|
||||
/// Pushes a new value to the currently open u64 fast field.
|
||||
#[inline]
|
||||
pub fn add_val(&mut self, val: u64) -> io::Result<()> {
|
||||
let val_to_write: u64 = val - self.min_value;
|
||||
self.bit_packer
|
||||
.write(val_to_write, self.num_bits, &mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn close_field(mut self) -> io::Result<()> {
|
||||
self.bit_packer.close(&mut self.write)?;
|
||||
self.min_value.serialize(&mut self.write)?;
|
||||
self.amplitude.serialize(&mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BitpackedFastFieldSerializer {}
|
||||
|
||||
impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
|
||||
const NAME: &'static str = "Bitpacked";
|
||||
const ID: u8 = 1;
|
||||
/// Serializes data with the BitpackedFastFieldSerializer.
|
||||
///
|
||||
/// The serializer in fact encode the values by bitpacking
|
||||
/// `(val - min_value)`.
|
||||
///
|
||||
/// It requires a `min_value` and a `max_value` to compute
|
||||
/// compute the minimum number of bits required to encode
|
||||
/// values.
|
||||
fn serialize(
|
||||
write: &mut impl Write,
|
||||
_fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
data_iter: impl Iterator<Item = u64>,
|
||||
_data_iter1: impl Iterator<Item = u64>,
|
||||
) -> io::Result<()> {
|
||||
let mut serializer =
|
||||
BitpackedFastFieldSerializerLegacy::open(write, stats.min_value, stats.max_value)?;
|
||||
|
||||
for val in data_iter {
|
||||
serializer.add_val(val)?;
|
||||
}
|
||||
serializer.close_field()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn is_applicable(
|
||||
_fastfield_accessor: &impl FastFieldDataAccess,
|
||||
_stats: FastFieldStats,
|
||||
) -> bool {
|
||||
true
|
||||
}
|
||||
fn estimate(_fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
|
||||
let amplitude = stats.max_value - stats.min_value;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let num_bits_uncompressed = 64;
|
||||
num_bits as f32 / num_bits_uncompressed as f32
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tests::get_codec_test_data_sets;
|
||||
|
||||
fn create_and_validate(data: &[u64], name: &str) {
|
||||
crate::tests::create_and_validate::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(
|
||||
&data, name,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_codec_data_sets() {
|
||||
let data_sets = get_codec_test_data_sets();
|
||||
for (mut data, name) in data_sets {
|
||||
create_and_validate(&data, name);
|
||||
data.reverse();
|
||||
create_and_validate(&data, name);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bitpacked_fast_field_rand() {
|
||||
for _ in 0..500 {
|
||||
let mut data = (0..1 + rand::random::<u8>() as usize)
|
||||
.map(|_| rand::random::<i64>() as u64 / 2 as u64)
|
||||
.collect::<Vec<_>>();
|
||||
create_and_validate(&data, "rand");
|
||||
|
||||
data.reverse();
|
||||
create_and_validate(&data, "rand");
|
||||
}
|
||||
}
|
||||
}
|
||||
231
fastfield_codecs/src/lib.rs
Normal file
231
fastfield_codecs/src/lib.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate more_asserts;
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
pub mod bitpacked;
|
||||
pub mod linearinterpol;
|
||||
pub mod multilinearinterpol;
|
||||
|
||||
pub trait FastFieldCodecReader: Sized {
|
||||
/// reads the metadata and returns the CodecReader
|
||||
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
|
||||
|
||||
fn get_u64(&self, doc: u64, data: &[u8]) -> u64;
|
||||
|
||||
fn min_value(&self) -> u64;
|
||||
fn max_value(&self) -> u64;
|
||||
}
|
||||
|
||||
/// The FastFieldSerializerEstimate trait is required on all variants
|
||||
/// of fast field compressions, to decide which one to choose.
|
||||
pub trait FastFieldCodecSerializer {
|
||||
/// A codex needs to provide a unique name and id, which is
|
||||
/// used for debugging and de/serialization.
|
||||
const NAME: &'static str;
|
||||
const ID: u8;
|
||||
|
||||
/// Check if the Codec is able to compress the data
|
||||
fn is_applicable(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> bool;
|
||||
|
||||
/// Returns an estimate of the compression ratio.
|
||||
/// The baseline is uncompressed 64bit data.
|
||||
///
|
||||
/// It could make sense to also return a value representing
|
||||
/// computational complexity.
|
||||
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32;
|
||||
|
||||
/// Serializes the data using the serializer into write.
|
||||
/// There are multiple iterators, in case the codec needs to read the data multiple times.
|
||||
/// The iterators should be preferred over using fastfield_accessor for performance reasons.
|
||||
fn serialize(
|
||||
write: &mut impl Write,
|
||||
fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
data_iter: impl Iterator<Item = u64>,
|
||||
data_iter1: impl Iterator<Item = u64>,
|
||||
) -> io::Result<()>;
|
||||
}
|
||||
|
||||
/// FastFieldDataAccess is the trait to access fast field data during serialization and estimation.
|
||||
pub trait FastFieldDataAccess {
|
||||
/// Return the value associated to the given document.
|
||||
///
|
||||
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if `doc` is greater than the segment
|
||||
fn get(&self, doc: u32) -> u64;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FastFieldStats {
|
||||
pub min_value: u64,
|
||||
pub max_value: u64,
|
||||
pub num_vals: u64,
|
||||
}
|
||||
|
||||
impl<'a> FastFieldDataAccess for &'a [u64] {
|
||||
fn get(&self, doc: u32) -> u64 {
|
||||
self[doc as usize]
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> FastFieldDataAccess for &'a Vec<u64> {
|
||||
fn get(&self, doc: u32) -> u64 {
|
||||
self[doc as usize]
|
||||
}
|
||||
}
|
||||
|
||||
impl FastFieldDataAccess for Vec<u64> {
|
||||
fn get(&self, doc: u32) -> u64 {
|
||||
self[doc as usize]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
|
||||
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
|
||||
multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn create_and_validate<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
|
||||
data: &[u64],
|
||||
name: &str,
|
||||
) -> (f32, f32) {
|
||||
if !S::is_applicable(&data, crate::tests::stats_from_vec(&data)) {
|
||||
return (f32::MAX, 0.0);
|
||||
}
|
||||
let estimation = S::estimate(&data, crate::tests::stats_from_vec(&data));
|
||||
let mut out = vec![];
|
||||
S::serialize(
|
||||
&mut out,
|
||||
&data,
|
||||
crate::tests::stats_from_vec(&data),
|
||||
data.iter().cloned(),
|
||||
data.iter().cloned(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let reader = R::open_from_bytes(&out).unwrap();
|
||||
for (doc, orig_val) in data.iter().enumerate() {
|
||||
let val = reader.get_u64(doc as u64, &out);
|
||||
if val != *orig_val {
|
||||
panic!(
|
||||
"val {:?} does not match orig_val {:?}, in data set {}, data {:?}",
|
||||
val, orig_val, name, data
|
||||
);
|
||||
}
|
||||
}
|
||||
let actual_compression = data.len() as f32 / out.len() as f32;
|
||||
return (estimation, actual_compression);
|
||||
}
|
||||
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
|
||||
let mut data_and_names = vec![];
|
||||
|
||||
let data = (10..=20_u64).collect::<Vec<_>>();
|
||||
data_and_names.push((data, "simple monotonically increasing"));
|
||||
|
||||
data_and_names.push((
|
||||
vec![5, 6, 7, 8, 9, 10, 99, 100],
|
||||
"offset in linear interpol",
|
||||
));
|
||||
data_and_names.push((vec![5, 50, 3, 13, 1, 1000, 35], "rand small"));
|
||||
data_and_names.push((vec![10], "single value"));
|
||||
|
||||
data_and_names
|
||||
}
|
||||
|
||||
fn test_codec<S: FastFieldCodecSerializer, R: FastFieldCodecReader>() {
|
||||
let codec_name = S::NAME;
|
||||
for (data, data_set_name) in get_codec_test_data_sets() {
|
||||
let (estimate, actual) =
|
||||
crate::tests::create_and_validate::<S, R>(&data, data_set_name);
|
||||
let result = if estimate == f32::MAX {
|
||||
"Disabled".to_string()
|
||||
} else {
|
||||
format!("Estimate {:?} Actual {:?} ", estimate, actual)
|
||||
};
|
||||
println!(
|
||||
"Codec {}, DataSet {}, {}",
|
||||
codec_name, data_set_name, result
|
||||
);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_codec_bitpacking() {
|
||||
test_codec::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>();
|
||||
}
|
||||
#[test]
|
||||
fn test_codec_interpolation() {
|
||||
test_codec::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>();
|
||||
}
|
||||
#[test]
|
||||
fn test_codec_multi_interpolation() {
|
||||
test_codec::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>();
|
||||
}
|
||||
|
||||
use super::*;
|
||||
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
|
||||
let min_value = data.iter().cloned().min().unwrap_or(0);
|
||||
let max_value = data.iter().cloned().max().unwrap_or(0);
|
||||
FastFieldStats {
|
||||
min_value,
|
||||
max_value,
|
||||
num_vals: data.len() as u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimation_good_interpolation_case() {
|
||||
let data = (10..=20000_u64).collect::<Vec<_>>();
|
||||
|
||||
let linear_interpol_estimation =
|
||||
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(linear_interpol_estimation, 0.01);
|
||||
|
||||
let multi_linear_interpol_estimation =
|
||||
MultiLinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(multi_linear_interpol_estimation, 0.2);
|
||||
assert_le!(linear_interpol_estimation, multi_linear_interpol_estimation);
|
||||
|
||||
let bitpacked_estimation =
|
||||
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(linear_interpol_estimation, bitpacked_estimation);
|
||||
}
|
||||
#[test]
|
||||
fn estimation_test_bad_interpolation_case() {
|
||||
let data = vec![200, 10, 10, 10, 10, 1000, 20];
|
||||
|
||||
let linear_interpol_estimation =
|
||||
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(linear_interpol_estimation, 0.32);
|
||||
|
||||
let bitpacked_estimation =
|
||||
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
#[test]
|
||||
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
|
||||
let mut data = (200..=20000_u64).collect::<Vec<_>>();
|
||||
data.push(1_000_000);
|
||||
|
||||
// in this case the linear interpolation can't in fact not be worse than bitpacking,
|
||||
// but the estimator adds some threshold, which leads to estimated worse behavior
|
||||
let linear_interpol_estimation =
|
||||
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(linear_interpol_estimation, 0.35);
|
||||
|
||||
let bitpacked_estimation =
|
||||
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
|
||||
assert_le!(bitpacked_estimation, 0.32);
|
||||
assert_le!(bitpacked_estimation, linear_interpol_estimation);
|
||||
}
|
||||
}
|
||||
297
fastfield_codecs/src/linearinterpol.rs
Normal file
297
fastfield_codecs/src/linearinterpol.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use common::BinarySerializable;
|
||||
use common::FixedSize;
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
#[derive(Clone)]
|
||||
pub struct LinearInterpolFastFieldReader {
|
||||
bit_unpacker: BitUnpacker,
|
||||
pub footer: LinearInterpolFooter,
|
||||
pub slope: f32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LinearInterpolFooter {
|
||||
pub relative_max_value: u64,
|
||||
pub offset: u64,
|
||||
pub first_val: u64,
|
||||
pub last_val: u64,
|
||||
pub num_vals: u64,
|
||||
pub min_value: u64,
|
||||
pub max_value: u64,
|
||||
}
|
||||
|
||||
impl BinarySerializable for LinearInterpolFooter {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
self.relative_max_value.serialize(write)?;
|
||||
self.offset.serialize(write)?;
|
||||
self.first_val.serialize(write)?;
|
||||
self.last_val.serialize(write)?;
|
||||
self.num_vals.serialize(write)?;
|
||||
self.min_value.serialize(write)?;
|
||||
self.max_value.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<LinearInterpolFooter> {
|
||||
Ok(LinearInterpolFooter {
|
||||
relative_max_value: u64::deserialize(reader)?,
|
||||
offset: u64::deserialize(reader)?,
|
||||
first_val: u64::deserialize(reader)?,
|
||||
last_val: u64::deserialize(reader)?,
|
||||
num_vals: u64::deserialize(reader)?,
|
||||
min_value: u64::deserialize(reader)?,
|
||||
max_value: u64::deserialize(reader)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for LinearInterpolFooter {
|
||||
const SIZE_IN_BYTES: usize = 56;
|
||||
}
|
||||
|
||||
impl FastFieldCodecReader for LinearInterpolFastFieldReader {
|
||||
/// Opens a fast field given a file.
|
||||
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
|
||||
let (_data, mut footer) = bytes.split_at(bytes.len() - LinearInterpolFooter::SIZE_IN_BYTES);
|
||||
let footer = LinearInterpolFooter::deserialize(&mut footer)?;
|
||||
let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals);
|
||||
|
||||
let num_bits = compute_num_bits(footer.relative_max_value);
|
||||
let bit_unpacker = BitUnpacker::new(num_bits);
|
||||
Ok(LinearInterpolFastFieldReader {
|
||||
bit_unpacker,
|
||||
footer,
|
||||
slope,
|
||||
})
|
||||
}
|
||||
#[inline]
|
||||
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
|
||||
let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope);
|
||||
(calculated_value + self.bit_unpacker.get(doc, &data)) - self.footer.offset
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn min_value(&self) -> u64 {
|
||||
self.footer.min_value
|
||||
}
|
||||
#[inline]
|
||||
fn max_value(&self) -> u64 {
|
||||
self.footer.max_value
|
||||
}
|
||||
}
|
||||
|
||||
/// Fastfield serializer, which tries to guess values by linear interpolation
|
||||
/// and stores the difference bitpacked.
|
||||
pub struct LinearInterpolFastFieldSerializer {}
|
||||
|
||||
#[inline]
|
||||
fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
|
||||
if num_vals <= 1 {
|
||||
return 0.0;
|
||||
}
|
||||
// We calculate the slope with f64 high precision and use the result in lower precision f32
|
||||
// This is done in order to handle estimations for very large values like i64::MAX
|
||||
((last_val as f64 - first_val as f64) / (num_vals as u64 - 1) as f64) as f32
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
|
||||
first_val + (pos as f32 * slope) as u64
|
||||
}
|
||||
|
||||
impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
|
||||
const NAME: &'static str = "LinearInterpol";
|
||||
const ID: u8 = 2;
|
||||
/// Creates a new fast field serializer.
|
||||
fn serialize(
|
||||
write: &mut impl Write,
|
||||
fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
data_iter: impl Iterator<Item = u64>,
|
||||
data_iter1: impl Iterator<Item = u64>,
|
||||
) -> io::Result<()> {
|
||||
assert!(stats.min_value <= stats.max_value);
|
||||
|
||||
let first_val = fastfield_accessor.get(0);
|
||||
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
|
||||
let slope = get_slope(first_val, last_val, stats.num_vals);
|
||||
// calculate offset to ensure all values are positive
|
||||
let mut offset = 0;
|
||||
let mut rel_positive_max = 0;
|
||||
for (pos, actual_value) in data_iter1.enumerate() {
|
||||
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
|
||||
if calculated_value > actual_value {
|
||||
// negative value we need to apply an offset
|
||||
// we ignore negative values in the max value calculation, because negative values
|
||||
// will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
//positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
|
||||
// rel_positive_max will be adjusted by offset
|
||||
let relative_max_value = rel_positive_max + offset;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value);
|
||||
let mut bit_packer = BitPacker::new();
|
||||
for (pos, val) in data_iter.enumerate() {
|
||||
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
|
||||
let diff = (val + offset) - calculated_value;
|
||||
bit_packer.write(diff, num_bits, write)?;
|
||||
}
|
||||
bit_packer.close(write)?;
|
||||
|
||||
let footer = LinearInterpolFooter {
|
||||
relative_max_value,
|
||||
offset,
|
||||
first_val,
|
||||
last_val,
|
||||
num_vals: stats.num_vals,
|
||||
min_value: stats.min_value,
|
||||
max_value: stats.max_value,
|
||||
};
|
||||
footer.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
fn is_applicable(
|
||||
_fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
) -> bool {
|
||||
if stats.num_vals < 3 {
|
||||
return false; //disable compressor for this case
|
||||
}
|
||||
// On serialisation the offset is added to the actual value.
|
||||
// We need to make sure this won't run into overflow calculation issues.
|
||||
// For this we take the maximum theroretical offset and add this to the max value.
|
||||
// If this doesn't overflow the algortihm should be fine
|
||||
let theorethical_maximum_offset = stats.max_value - stats.min_value;
|
||||
if stats
|
||||
.max_value
|
||||
.checked_add(theorethical_maximum_offset)
|
||||
.is_none()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
/// estimation for linear interpolation is hard because, you don't know
|
||||
/// where the local maxima for the deviation of the calculated value are and
|
||||
/// the offset to shift all values to >=0 is also unknown.
|
||||
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
|
||||
let first_val = fastfield_accessor.get(0);
|
||||
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
|
||||
let slope = get_slope(first_val, last_val, stats.num_vals);
|
||||
|
||||
// let's sample at 0%, 5%, 10% .. 95%, 100%
|
||||
let num_vals = stats.num_vals as f32 / 100.0;
|
||||
let sample_positions = (0..20)
|
||||
.map(|pos| (num_vals * pos as f32 * 5.0) as usize)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_distance = sample_positions
|
||||
.iter()
|
||||
.map(|pos| {
|
||||
let calculated_value = get_calculated_value(first_val, *pos as u64, slope);
|
||||
let actual_value = fastfield_accessor.get(*pos as u32);
|
||||
distance(calculated_value, actual_value)
|
||||
})
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within 50%
|
||||
// threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * stats.num_vals as u64
|
||||
+ LinearInterpolFooter::SIZE_IN_BYTES as u64;
|
||||
let num_bits_uncompressed = 64 * stats.num_vals;
|
||||
num_bits as f32 / num_bits_uncompressed as f32
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
|
||||
if x < y {
|
||||
y - x
|
||||
} else {
|
||||
x - y
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tests::get_codec_test_data_sets;
|
||||
|
||||
fn create_and_validate(data: &[u64], name: &str) {
|
||||
crate::tests::create_and_validate::<
|
||||
LinearInterpolFastFieldSerializer,
|
||||
LinearInterpolFastFieldReader,
|
||||
>(&data, name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_codec_data_sets() {
|
||||
let data_sets = get_codec_test_data_sets();
|
||||
for (mut data, name) in data_sets {
|
||||
create_and_validate(&data, name);
|
||||
data.reverse();
|
||||
create_and_validate(&data, name);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn linear_interpol_fast_field_test_large_amplitude() {
|
||||
let data = vec![
|
||||
i64::MAX as u64 / 2,
|
||||
i64::MAX as u64 / 3,
|
||||
i64::MAX as u64 / 2,
|
||||
];
|
||||
|
||||
create_and_validate(&data, "large amplitude");
|
||||
}
|
||||
#[test]
|
||||
fn linear_interpol_fast_concave_data() {
|
||||
let data = vec![0, 1, 2, 5, 8, 10, 20, 50];
|
||||
create_and_validate(&data, "concave data");
|
||||
}
|
||||
#[test]
|
||||
fn linear_interpol_fast_convex_data() {
|
||||
let data = vec![0, 40, 60, 70, 75, 77];
|
||||
create_and_validate(&data, "convex data");
|
||||
}
|
||||
#[test]
|
||||
fn linear_interpol_fast_field_test_simple() {
|
||||
let data = (10..=20_u64).collect::<Vec<_>>();
|
||||
|
||||
create_and_validate(&data, "simple monotonically");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn linear_interpol_fast_field_rand() {
|
||||
for _ in 0..5000 {
|
||||
let mut data = (0..50 as usize)
|
||||
.map(|_| rand::random::<u64>())
|
||||
.collect::<Vec<_>>();
|
||||
create_and_validate(&data, "random");
|
||||
|
||||
data.reverse();
|
||||
create_and_validate(&data, "random");
|
||||
}
|
||||
}
|
||||
}
|
||||
126
fastfield_codecs/src/main.rs
Normal file
126
fastfield_codecs/src/main.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
#[macro_use]
|
||||
extern crate prettytable;
|
||||
use fastfield_codecs::{
|
||||
linearinterpol::LinearInterpolFastFieldSerializer,
|
||||
multilinearinterpol::MultiLinearInterpolFastFieldSerializer, FastFieldCodecSerializer,
|
||||
FastFieldStats,
|
||||
};
|
||||
use prettytable::{Cell, Row, Table};
|
||||
|
||||
fn main() {
|
||||
let mut table = Table::new();
|
||||
|
||||
// Add a row per time
|
||||
table.add_row(row!["", "Compression Ratio", "Compression Estimation"]);
|
||||
|
||||
for (data, data_set_name) in get_codec_test_data_sets() {
|
||||
let mut results = vec![];
|
||||
let res = serialize_with_codec::<LinearInterpolFastFieldSerializer>(&data);
|
||||
results.push(res);
|
||||
let res = serialize_with_codec::<MultiLinearInterpolFastFieldSerializer>(&data);
|
||||
results.push(res);
|
||||
let res = serialize_with_codec::<fastfield_codecs::bitpacked::BitpackedFastFieldSerializer>(
|
||||
&data,
|
||||
);
|
||||
results.push(res);
|
||||
|
||||
//let best_estimation_codec = results
|
||||
//.iter()
|
||||
//.min_by(|res1, res2| res1.partial_cmp(&res2).unwrap())
|
||||
//.unwrap();
|
||||
let best_compression_ratio_codec = results
|
||||
.iter()
|
||||
.min_by(|res1, res2| res1.partial_cmp(&res2).unwrap())
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
table.add_row(Row::new(vec![Cell::new(data_set_name).style_spec("Bbb")]));
|
||||
for (is_applicable, est, comp, name) in results {
|
||||
let (est_cell, ratio_cell) = if !is_applicable {
|
||||
("Codec Disabled".to_string(), "".to_string())
|
||||
} else {
|
||||
(est.to_string(), comp.to_string())
|
||||
};
|
||||
let style = if comp == best_compression_ratio_codec.1 {
|
||||
"Fb"
|
||||
} else {
|
||||
""
|
||||
};
|
||||
|
||||
table.add_row(Row::new(vec![
|
||||
Cell::new(&name.to_string()).style_spec("bFg"),
|
||||
Cell::new(&ratio_cell).style_spec(style),
|
||||
Cell::new(&est_cell).style_spec(""),
|
||||
]));
|
||||
}
|
||||
}
|
||||
|
||||
table.printstd();
|
||||
}
|
||||
|
||||
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
|
||||
let mut data_and_names = vec![];
|
||||
|
||||
let data = (1000..=200_000_u64).collect::<Vec<_>>();
|
||||
data_and_names.push((data, "Autoincrement"));
|
||||
|
||||
let mut current_cumulative = 0;
|
||||
let data = (1..=200_000_u64)
|
||||
.map(|num| {
|
||||
let num = (num as f32 + num as f32).log10() as u64;
|
||||
current_cumulative += num;
|
||||
current_cumulative
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
//let data = (1..=200000_u64).map(|num| num + num).collect::<Vec<_>>();
|
||||
data_and_names.push((data, "Monotonically increasing concave"));
|
||||
|
||||
let mut current_cumulative = 0;
|
||||
let data = (1..=200_000_u64)
|
||||
.map(|num| {
|
||||
let num = (200_000.0 - num as f32).log10() as u64;
|
||||
current_cumulative += num;
|
||||
current_cumulative
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
data_and_names.push((data, "Monotonically increasing convex"));
|
||||
|
||||
let data = (1000..=200_000_u64)
|
||||
.map(|num| num + rand::random::<u8>() as u64)
|
||||
.collect::<Vec<_>>();
|
||||
data_and_names.push((data, "Almost monotonically increasing"));
|
||||
|
||||
data_and_names
|
||||
}
|
||||
|
||||
pub fn serialize_with_codec<S: FastFieldCodecSerializer>(
|
||||
data: &[u64],
|
||||
) -> (bool, f32, f32, &'static str) {
|
||||
let is_applicable = S::is_applicable(&data, stats_from_vec(&data));
|
||||
if !is_applicable {
|
||||
return (false, 0.0, 0.0, S::NAME);
|
||||
}
|
||||
let estimation = S::estimate(&data, stats_from_vec(&data));
|
||||
let mut out = vec![];
|
||||
S::serialize(
|
||||
&mut out,
|
||||
&data,
|
||||
stats_from_vec(&data),
|
||||
data.iter().cloned(),
|
||||
data.iter().cloned(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let actual_compression = out.len() as f32 / (data.len() * 8) as f32;
|
||||
return (true, estimation, actual_compression, S::NAME);
|
||||
}
|
||||
|
||||
pub fn stats_from_vec(data: &[u64]) -> FastFieldStats {
|
||||
let min_value = data.iter().cloned().min().unwrap_or(0);
|
||||
let max_value = data.iter().cloned().max().unwrap_or(0);
|
||||
FastFieldStats {
|
||||
min_value,
|
||||
max_value,
|
||||
num_vals: data.len() as u64,
|
||||
}
|
||||
}
|
||||
410
fastfield_codecs/src/multilinearinterpol.rs
Normal file
410
fastfield_codecs/src/multilinearinterpol.rs
Normal file
@@ -0,0 +1,410 @@
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use common::CountingWriter;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use common::BinarySerializable;
|
||||
use common::DeserializeFrom;
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
const CHUNK_SIZE: u64 = 512;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
#[derive(Clone)]
|
||||
pub struct MultiLinearInterpolFastFieldReader {
|
||||
pub footer: MultiLinearInterpolFooter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
struct Function {
|
||||
// The offset in the data is required, because we have diffrent bit_widths per block
|
||||
data_start_offset: u64,
|
||||
// start_pos in the block will be CHUNK_SIZE * BLOCK_NUM
|
||||
start_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
end_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
value_start_pos: u64,
|
||||
// only used during serialization, 0 after deserialization
|
||||
value_end_pos: u64,
|
||||
slope: f32,
|
||||
// The offset so that all values are positive when writing them
|
||||
positive_val_offset: u64,
|
||||
num_bits: u8,
|
||||
bit_unpacker: BitUnpacker,
|
||||
}
|
||||
|
||||
impl Function {
|
||||
fn calc_slope(&mut self) {
|
||||
let num_vals = self.end_pos - self.start_pos;
|
||||
get_slope(self.value_start_pos, self.value_end_pos, num_vals);
|
||||
}
|
||||
// split the interpolation into two function, change self and return the second split
|
||||
fn split(&mut self, split_pos: u64, split_pos_value: u64) -> Function {
|
||||
let mut new_function = Function {
|
||||
start_pos: split_pos,
|
||||
end_pos: self.end_pos,
|
||||
value_start_pos: split_pos_value,
|
||||
value_end_pos: self.value_end_pos,
|
||||
..Default::default()
|
||||
};
|
||||
new_function.calc_slope();
|
||||
self.end_pos = split_pos;
|
||||
self.value_end_pos = split_pos_value;
|
||||
self.calc_slope();
|
||||
new_function
|
||||
}
|
||||
}
|
||||
|
||||
impl BinarySerializable for Function {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
self.data_start_offset.serialize(write)?;
|
||||
self.value_start_pos.serialize(write)?;
|
||||
self.positive_val_offset.serialize(write)?;
|
||||
self.slope.serialize(write)?;
|
||||
self.num_bits.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Function> {
|
||||
let data_start_offset = u64::deserialize(reader)?;
|
||||
let value_start_pos = u64::deserialize(reader)?;
|
||||
let offset = u64::deserialize(reader)?;
|
||||
let slope = f32::deserialize(reader)?;
|
||||
let num_bits = u8::deserialize(reader)?;
|
||||
let interpolation = Function {
|
||||
data_start_offset,
|
||||
value_start_pos,
|
||||
positive_val_offset: offset,
|
||||
num_bits,
|
||||
bit_unpacker: BitUnpacker::new(num_bits),
|
||||
slope,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(interpolation)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MultiLinearInterpolFooter {
|
||||
pub num_vals: u64,
|
||||
pub min_value: u64,
|
||||
pub max_value: u64,
|
||||
interpolations: Vec<Function>,
|
||||
}
|
||||
|
||||
impl BinarySerializable for MultiLinearInterpolFooter {
|
||||
fn serialize<W: Write>(&self, write: &mut W) -> io::Result<()> {
|
||||
let mut out = vec![];
|
||||
self.num_vals.serialize(&mut out)?;
|
||||
self.min_value.serialize(&mut out)?;
|
||||
self.max_value.serialize(&mut out)?;
|
||||
self.interpolations.serialize(&mut out)?;
|
||||
write.write_all(&out)?;
|
||||
(out.len() as u32).serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<MultiLinearInterpolFooter> {
|
||||
let mut footer = MultiLinearInterpolFooter {
|
||||
num_vals: u64::deserialize(reader)?,
|
||||
min_value: u64::deserialize(reader)?,
|
||||
max_value: u64::deserialize(reader)?,
|
||||
interpolations: Vec::<Function>::deserialize(reader)?,
|
||||
};
|
||||
for (num, interpol) in footer.interpolations.iter_mut().enumerate() {
|
||||
interpol.start_pos = CHUNK_SIZE * num as u64;
|
||||
}
|
||||
Ok(footer)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_interpolation_position(doc: u64) -> usize {
|
||||
let index = doc / CHUNK_SIZE;
|
||||
index as usize
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Function {
|
||||
&interpolations[get_interpolation_position(doc)]
|
||||
}
|
||||
|
||||
impl FastFieldCodecReader for MultiLinearInterpolFastFieldReader {
|
||||
/// Opens a fast field given a file.
|
||||
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
|
||||
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
|
||||
|
||||
let (_data, mut footer) = bytes.split_at(bytes.len() - (4 + footer_len) as usize);
|
||||
let footer = MultiLinearInterpolFooter::deserialize(&mut footer)?;
|
||||
|
||||
Ok(MultiLinearInterpolFastFieldReader { footer })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
|
||||
let interpolation = get_interpolation_function(doc, &self.footer.interpolations);
|
||||
let doc = doc - interpolation.start_pos;
|
||||
let calculated_value =
|
||||
get_calculated_value(interpolation.value_start_pos, doc, interpolation.slope);
|
||||
let diff = interpolation
|
||||
.bit_unpacker
|
||||
.get(doc, &data[interpolation.data_start_offset as usize..]);
|
||||
(calculated_value + diff) - interpolation.positive_val_offset
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn min_value(&self) -> u64 {
|
||||
self.footer.min_value
|
||||
}
|
||||
#[inline]
|
||||
fn max_value(&self) -> u64 {
|
||||
self.footer.max_value
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
|
||||
((last_val as f64 - first_val as f64) / (num_vals as u64 - 1) as f64) as f32
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
|
||||
(first_val as i64 + (pos as f32 * slope) as i64) as u64
|
||||
}
|
||||
|
||||
/// Same as LinearInterpolFastFieldSerializer, but working on chunks of CHUNK_SIZE elements.
|
||||
pub struct MultiLinearInterpolFastFieldSerializer {}
|
||||
|
||||
impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
|
||||
const NAME: &'static str = "MultiLinearInterpol";
|
||||
const ID: u8 = 3;
|
||||
/// Creates a new fast field serializer.
|
||||
fn serialize(
|
||||
write: &mut impl Write,
|
||||
fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
data_iter: impl Iterator<Item = u64>,
|
||||
_data_iter1: impl Iterator<Item = u64>,
|
||||
) -> io::Result<()> {
|
||||
assert!(stats.min_value <= stats.max_value);
|
||||
|
||||
let first_val = fastfield_accessor.get(0);
|
||||
let last_val = fastfield_accessor.get(stats.num_vals as u32 - 1);
|
||||
|
||||
let mut first_function = Function {
|
||||
end_pos: stats.num_vals,
|
||||
value_start_pos: first_val,
|
||||
value_end_pos: last_val,
|
||||
..Default::default()
|
||||
};
|
||||
first_function.calc_slope();
|
||||
let mut interpolations = vec![first_function];
|
||||
|
||||
// Since we potentially apply multiple passes over the data, the data is cached.
|
||||
// Multiple iteration can be expensive (merge with index sorting can add lot of overhead per
|
||||
// iteration)
|
||||
let data = data_iter.collect::<Vec<_>>();
|
||||
|
||||
//// let's split this into chunks of CHUNK_SIZE
|
||||
for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) {
|
||||
let new_fun = {
|
||||
let current_interpolation = interpolations.last_mut().unwrap();
|
||||
current_interpolation.split(data_pos, data[data_pos as usize])
|
||||
};
|
||||
interpolations.push(new_fun);
|
||||
}
|
||||
// calculate offset and max (-> numbits) for each function
|
||||
for interpolation in &mut interpolations {
|
||||
let mut offset = 0;
|
||||
let mut rel_positive_max = 0;
|
||||
for (pos, actual_value) in data
|
||||
[interpolation.start_pos as usize..interpolation.end_pos as usize]
|
||||
.iter()
|
||||
.cloned()
|
||||
.enumerate()
|
||||
{
|
||||
let calculated_value = get_calculated_value(
|
||||
interpolation.value_start_pos,
|
||||
pos as u64,
|
||||
interpolation.slope,
|
||||
);
|
||||
if calculated_value > actual_value {
|
||||
// negative value we need to apply an offset
|
||||
// we ignore negative values in the max value calculation, because negative values
|
||||
// will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
//positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
|
||||
interpolation.positive_val_offset = offset;
|
||||
interpolation.num_bits = compute_num_bits(rel_positive_max + offset);
|
||||
}
|
||||
let mut bit_packer = BitPacker::new();
|
||||
|
||||
let write = &mut CountingWriter::wrap(write);
|
||||
for interpolation in &mut interpolations {
|
||||
interpolation.data_start_offset = write.written_bytes();
|
||||
let num_bits = interpolation.num_bits;
|
||||
for (pos, actual_value) in data
|
||||
[interpolation.start_pos as usize..interpolation.end_pos as usize]
|
||||
.iter()
|
||||
.cloned()
|
||||
.enumerate()
|
||||
{
|
||||
let calculated_value = get_calculated_value(
|
||||
interpolation.value_start_pos,
|
||||
pos as u64,
|
||||
interpolation.slope,
|
||||
);
|
||||
let diff = (actual_value + interpolation.positive_val_offset) - calculated_value;
|
||||
bit_packer.write(diff, num_bits, write)?;
|
||||
}
|
||||
bit_packer.flush(write)?;
|
||||
}
|
||||
bit_packer.close(write)?;
|
||||
|
||||
let footer = MultiLinearInterpolFooter {
|
||||
num_vals: stats.num_vals,
|
||||
min_value: stats.min_value,
|
||||
max_value: stats.max_value,
|
||||
interpolations,
|
||||
};
|
||||
footer.serialize(write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_applicable(
|
||||
_fastfield_accessor: &impl FastFieldDataAccess,
|
||||
stats: FastFieldStats,
|
||||
) -> bool {
|
||||
if stats.num_vals < 5_000 {
|
||||
return false;
|
||||
}
|
||||
// On serialization the offset is added to the actual value.
|
||||
// We need to make sure this won't run into overflow calculation issues.
|
||||
// For this we take the maximum theroretical offset and add this to the max value.
|
||||
// If this doesn't overflow the algortihm should be fine
|
||||
let theorethical_maximum_offset = stats.max_value - stats.min_value;
|
||||
if stats
|
||||
.max_value
|
||||
.checked_add(theorethical_maximum_offset)
|
||||
.is_none()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
/// estimation for linear interpolation is hard because, you don't know
|
||||
/// where the local maxima are for the deviation of the calculated value and
|
||||
/// the offset is also unknown.
|
||||
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
|
||||
let first_val_in_first_block = fastfield_accessor.get(0);
|
||||
let last_elem_in_first_chunk = CHUNK_SIZE.min(stats.num_vals);
|
||||
let last_val_in_first_block = fastfield_accessor.get(last_elem_in_first_chunk as u32 - 1);
|
||||
let slope = get_slope(
|
||||
first_val_in_first_block,
|
||||
last_val_in_first_block,
|
||||
stats.num_vals,
|
||||
);
|
||||
|
||||
// let's sample at 0%, 5%, 10% .. 95%, 100%, but for the first block only
|
||||
let sample_positions = (0..20)
|
||||
.map(|pos| (last_elem_in_first_chunk as f32 / 100.0 * pos as f32 * 5.0) as usize)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_distance = sample_positions
|
||||
.iter()
|
||||
.map(|pos| {
|
||||
let calculated_value =
|
||||
get_calculated_value(first_val_in_first_block, *pos as u64, slope);
|
||||
let actual_value = fastfield_accessor.get(*pos as u32);
|
||||
distance(calculated_value, actual_value)
|
||||
})
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// Estimate one block and extrapolate the cost to all blocks.
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within 50%
|
||||
// threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
let relative_max_value = (max_distance as f32 * 1.5) * 2.0;
|
||||
|
||||
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * stats.num_vals as u64
|
||||
// function metadata per block
|
||||
+ 29 * (stats.num_vals / CHUNK_SIZE);
|
||||
let num_bits_uncompressed = 64 * stats.num_vals;
|
||||
num_bits as f32 / num_bits_uncompressed as f32
|
||||
}
|
||||
}
|
||||
|
||||
fn distance<T: Sub<Output = T> + Ord>(x: T, y: T) -> T {
|
||||
if x < y {
|
||||
y - x
|
||||
} else {
|
||||
x - y
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tests::get_codec_test_data_sets;
|
||||
|
||||
fn create_and_validate(data: &[u64], name: &str) {
|
||||
crate::tests::create_and_validate::<
|
||||
MultiLinearInterpolFastFieldSerializer,
|
||||
MultiLinearInterpolFastFieldReader,
|
||||
>(&data, name);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_codec_data_sets() {
|
||||
let data_sets = get_codec_test_data_sets();
|
||||
for (mut data, name) in data_sets {
|
||||
create_and_validate(&data, name);
|
||||
data.reverse();
|
||||
create_and_validate(&data, name);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_simple() {
|
||||
let data = (10..=20_u64).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "simple monotonically");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn border_cases_1() {
|
||||
let data = (0..1024).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "border case");
|
||||
}
|
||||
#[test]
|
||||
fn border_case_2() {
|
||||
let data = (0..1025).collect::<Vec<_>>();
|
||||
create_and_validate(&data, "border case");
|
||||
}
|
||||
#[test]
|
||||
fn rand() {
|
||||
for _ in 0..10 {
|
||||
let mut data = (5_000..20_000)
|
||||
.map(|_| rand::random::<u64>() as u64)
|
||||
.collect::<Vec<_>>();
|
||||
create_and_validate(&data, "random");
|
||||
|
||||
data.reverse();
|
||||
create_and_validate(&data, "random");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,15 @@
|
||||
mod bitset;
|
||||
mod composite_file;
|
||||
mod counting_writer;
|
||||
mod serialize;
|
||||
mod vint;
|
||||
|
||||
pub use self::bitset::BitSet;
|
||||
pub(crate) use self::bitset::TinySet;
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::counting_writer::CountingWriter;
|
||||
pub use self::serialize::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
pub use self::vint::{
|
||||
pub use byteorder::LittleEndian as Endianness;
|
||||
pub use common::CountingWriter;
|
||||
pub use common::{
|
||||
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt,
|
||||
};
|
||||
pub use byteorder::LittleEndian as Endianness;
|
||||
pub use common::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
|
||||
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
|
||||
///
|
||||
@@ -103,8 +100,8 @@ pub fn u64_to_f64(val: u64) -> f64 {
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
|
||||
pub use super::serialize::test::fixed_size_test;
|
||||
use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||
use common::{BinarySerializable, FixedSize};
|
||||
use proptest::prelude::*;
|
||||
use std::f64;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
@@ -118,6 +115,12 @@ pub(crate) mod test {
|
||||
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
|
||||
}
|
||||
|
||||
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
|
||||
let mut buffer = Vec::new();
|
||||
O::default().serialize(&mut buffer).unwrap();
|
||||
assert_eq!(buffer.len(), O::SIZE_IN_BYTES);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
|
||||
|
||||
@@ -28,7 +28,9 @@ pub use self::file_slice::{FileHandle, FileSlice};
|
||||
pub use self::owned_bytes::OwnedBytes;
|
||||
pub use self::ram_directory::RamDirectory;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
pub use common::AntiCallToken;
|
||||
pub use common::TerminatingWrite;
|
||||
use std::io::BufWriter;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Outcome of the Garbage collection
|
||||
@@ -50,47 +52,6 @@ pub use self::mmap_directory::MmapDirectory;
|
||||
|
||||
pub use self::managed_directory::ManagedDirectory;
|
||||
|
||||
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly
|
||||
///
|
||||
/// The point is that while the type is public, it cannot be built by anyone
|
||||
/// outside of this module.
|
||||
pub struct AntiCallToken(());
|
||||
|
||||
/// Trait used to indicate when no more write need to be done on a writer
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
self.terminate_ref(AntiCallToken(()))
|
||||
}
|
||||
|
||||
/// You should implement this function to define custom behavior.
|
||||
/// This function should flush any buffer it may hold.
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>;
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.as_mut().terminate_ref(token)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
||||
fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.get_mut().terminate_ref(a)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/// Write object for Directory.
|
||||
///
|
||||
/// `WritePtr` are required to implement both Write
|
||||
|
||||
@@ -46,7 +46,7 @@ impl Drop for VecWriter {
|
||||
fn drop(&mut self) {
|
||||
if !self.is_flushed {
|
||||
panic!(
|
||||
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
|
||||
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop. This also occurs when the indexer crashed, so you may want to check the logs for the root cause.",
|
||||
self.path
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::io;
|
||||
|
||||
use crate::fastfield::serializer::FastFieldSerializer;
|
||||
use crate::schema::{Document, Field, Value};
|
||||
use crate::DocId;
|
||||
use crate::{
|
||||
|
||||
@@ -29,12 +29,13 @@ pub use self::delete::DeleteBitSet;
|
||||
pub use self::error::{FastFieldNotAvailableError, Result};
|
||||
pub use self::facet_reader::FacetReader;
|
||||
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
|
||||
pub use self::reader::BitpackedFastFieldReader;
|
||||
pub(crate) use self::reader::BitpackedFastFieldReader;
|
||||
pub use self::reader::DynamicFastFieldReader;
|
||||
pub use self::reader::FastFieldReader;
|
||||
pub use self::readers::FastFieldReaders;
|
||||
pub use self::serializer::CompositeFastFieldSerializer;
|
||||
pub use self::serializer::FastFieldSerializer;
|
||||
pub use self::serializer::FastFieldDataAccess;
|
||||
pub use self::serializer::FastFieldStats;
|
||||
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
|
||||
use crate::schema::Cardinality;
|
||||
use crate::schema::FieldType;
|
||||
@@ -213,15 +214,14 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::common::CompositeFile;
|
||||
use crate::common::HasLen;
|
||||
use crate::directory::{Directory, RamDirectory, WritePtr};
|
||||
use crate::fastfield::BitpackedFastFieldReader;
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::schema::Field;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::FAST;
|
||||
use crate::schema::{Document, IntOptions};
|
||||
use crate::{Index, SegmentId, SegmentReader};
|
||||
use common::HasLen;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::StdRng;
|
||||
@@ -239,7 +239,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
pub fn test_fastfield() {
|
||||
let test_fastfield = BitpackedFastFieldReader::<u64>::from(vec![100, 200, 300]);
|
||||
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
|
||||
assert_eq!(test_fastfield.get(0), 100);
|
||||
assert_eq!(test_fastfield.get(1), 200);
|
||||
assert_eq!(test_fastfield.get(2), 300);
|
||||
@@ -268,10 +268,10 @@ mod tests {
|
||||
serializer.close().unwrap();
|
||||
}
|
||||
let file = directory.open_read(&path).unwrap();
|
||||
assert_eq!(file.len(), 36 as usize);
|
||||
assert_eq!(file.len(), 37 as usize);
|
||||
let composite_file = CompositeFile::open(&file)?;
|
||||
let file = composite_file.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(file)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
|
||||
assert_eq!(fast_field_reader.get(0), 13u64);
|
||||
assert_eq!(fast_field_reader.get(1), 14u64);
|
||||
assert_eq!(fast_field_reader.get(2), 2u64);
|
||||
@@ -299,11 +299,11 @@ mod tests {
|
||||
serializer.close()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
assert_eq!(file.len(), 61 as usize);
|
||||
assert_eq!(file.len(), 62 as usize);
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
|
||||
assert_eq!(fast_field_reader.get(0), 4u64);
|
||||
assert_eq!(fast_field_reader.get(1), 14_082_001u64);
|
||||
assert_eq!(fast_field_reader.get(2), 3_052u64);
|
||||
@@ -335,11 +335,11 @@ mod tests {
|
||||
serializer.close().unwrap();
|
||||
}
|
||||
let file = directory.open_read(&path).unwrap();
|
||||
assert_eq!(file.len(), 34 as usize);
|
||||
assert_eq!(file.len(), 35 as usize);
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
|
||||
for doc in 0..10_000 {
|
||||
assert_eq!(fast_field_reader.get(doc), 100_000u64);
|
||||
}
|
||||
@@ -367,11 +367,11 @@ mod tests {
|
||||
serializer.close().unwrap();
|
||||
}
|
||||
let file = directory.open_read(&path).unwrap();
|
||||
assert_eq!(file.len(), 80042 as usize);
|
||||
assert_eq!(file.len(), 80043 as usize);
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
|
||||
assert_eq!(fast_field_reader.get(0), 0u64);
|
||||
for doc in 1..10_001 {
|
||||
assert_eq!(
|
||||
@@ -406,11 +406,12 @@ mod tests {
|
||||
serializer.close().unwrap();
|
||||
}
|
||||
let file = directory.open_read(&path).unwrap();
|
||||
assert_eq!(file.len(), 17709 as usize);
|
||||
//assert_eq!(file.len(), 17710 as usize); //bitpacked size
|
||||
assert_eq!(file.len(), 10175 as usize); // linear interpol size
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
let data = fast_fields_composite.open_read(i64_field).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
|
||||
|
||||
assert_eq!(fast_field_reader.min_value(), -100i64);
|
||||
assert_eq!(fast_field_reader.max_value(), 9_999i64);
|
||||
@@ -450,7 +451,7 @@ mod tests {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(i64_field).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
|
||||
assert_eq!(fast_field_reader.get(0u32), 0i64);
|
||||
}
|
||||
Ok(())
|
||||
@@ -483,7 +484,7 @@ mod tests {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
|
||||
|
||||
let mut a = 0u64;
|
||||
for _ in 0..n {
|
||||
@@ -627,7 +628,7 @@ mod bench {
|
||||
let directory: RamDirectory = RamDirectory::create();
|
||||
{
|
||||
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
|
||||
let mut serializer = FastFieldSerializer::from_write(write).unwrap();
|
||||
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
|
||||
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
|
||||
for &x in &permutation {
|
||||
fast_field_writers.add_document(&doc!(*FIELD=>x));
|
||||
@@ -641,7 +642,7 @@ mod bench {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let n = test::black_box(7000u32);
|
||||
@@ -661,7 +662,7 @@ mod bench {
|
||||
let directory: RamDirectory = RamDirectory::create();
|
||||
{
|
||||
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
|
||||
let mut serializer = FastFieldSerializer::from_write(write).unwrap();
|
||||
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
|
||||
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
|
||||
for &x in &permutation {
|
||||
fast_field_writers.add_document(&doc!(*FIELD=>x));
|
||||
@@ -675,7 +676,7 @@ mod bench {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
|
||||
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let n = test::black_box(1000u32);
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::fastfield::{BitpackedFastFieldReader, FastFieldReader, FastValue, MultiValueLength};
|
||||
use crate::fastfield::{
|
||||
BitpackedFastFieldReader, DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength,
|
||||
};
|
||||
use crate::DocId;
|
||||
|
||||
/// Reader for a multivalued `u64` fast field.
|
||||
@@ -13,13 +15,13 @@ use crate::DocId;
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct MultiValuedFastFieldReader<Item: FastValue> {
|
||||
idx_reader: BitpackedFastFieldReader<u64>,
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
vals_reader: BitpackedFastFieldReader<Item>,
|
||||
}
|
||||
|
||||
impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
|
||||
pub(crate) fn open(
|
||||
idx_reader: BitpackedFastFieldReader<u64>,
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
vals_reader: BitpackedFastFieldReader<Item>,
|
||||
) -> MultiValuedFastFieldReader<Item> {
|
||||
MultiValuedFastFieldReader {
|
||||
@@ -44,6 +46,24 @@ impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
|
||||
self.vals_reader.get_range_u64(range.start, &mut vals[..]);
|
||||
}
|
||||
|
||||
/// Returns the minimum value for this fast field.
|
||||
///
|
||||
/// The min value does not take in account of possible
|
||||
/// deleted document, and should be considered as a lower bound
|
||||
/// of the actual mimimum value.
|
||||
pub fn min_value(&self) -> Item {
|
||||
self.vals_reader.min_value()
|
||||
}
|
||||
|
||||
/// Returns the maximum value for this fast field.
|
||||
///
|
||||
/// The max value does not take in account of possible
|
||||
/// deleted document, and should be considered as an upper bound
|
||||
/// of the actual maximum value.
|
||||
pub fn max_value(&self) -> Item {
|
||||
self.vals_reader.max_value()
|
||||
}
|
||||
|
||||
/// Returns the number of values associated with the document `DocId`.
|
||||
pub fn num_vals(&self, doc: DocId) -> usize {
|
||||
let range = self.range(doc);
|
||||
@@ -69,7 +89,7 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
|
||||
mod tests {
|
||||
|
||||
use crate::core::Index;
|
||||
use crate::schema::{Facet, Schema, INDEXED};
|
||||
use crate::schema::{Cardinality, Facet, IntOptions, Schema, INDEXED};
|
||||
|
||||
#[test]
|
||||
fn test_multifastfield_reader() {
|
||||
@@ -124,4 +144,32 @@ mod tests {
|
||||
assert_eq!(&vals[..], &[4]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multifastfield_reader_min_max() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field_options = IntOptions::default()
|
||||
.set_indexed()
|
||||
.set_fast(Cardinality::MultiValues);
|
||||
let item_field = schema_builder.add_i64_field("items", field_options);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index
|
||||
.writer_for_tests()
|
||||
.expect("Failed to create index writer.");
|
||||
index_writer.add_document(doc!(
|
||||
item_field => 2i64,
|
||||
item_field => 3i64,
|
||||
item_field => -2i64,
|
||||
));
|
||||
index_writer.add_document(doc!(item_field => 6i64, item_field => 3i64));
|
||||
index_writer.add_document(doc!(item_field => 4i64));
|
||||
index_writer.commit().expect("Commit failed");
|
||||
let searcher = index.reader().unwrap().searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let field_reader = segment_reader.fast_fields().i64s(item_field).unwrap();
|
||||
|
||||
assert_eq!(field_reader.min_value(), -2);
|
||||
assert_eq!(field_reader.max_value(), 6);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::fastfield::serializer::DynamicFastFieldSerializer;
|
||||
use crate::fastfield::serializer::FastFieldSerializer;
|
||||
use crate::fastfield::serializer::BitpackedFastFieldSerializerLegacy;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Document, Field};
|
||||
@@ -155,7 +154,7 @@ impl MultiValuedFastFieldWriter {
|
||||
}
|
||||
{
|
||||
// writing the values themselves.
|
||||
let mut value_serializer: DynamicFastFieldSerializer<'_, _>;
|
||||
let mut value_serializer: BitpackedFastFieldSerializerLegacy<'_, _>;
|
||||
match mapping_opt {
|
||||
Some(mapping) => {
|
||||
value_serializer = serializer.new_u64_fast_field_with_idx(
|
||||
|
||||
@@ -8,11 +8,17 @@ use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::FAST;
|
||||
use crate::DocId;
|
||||
use fastfield_codecs::bitpacked::BitpackedFastFieldReader as BitpackedReader;
|
||||
use fastfield_codecs::bitpacked::BitpackedFastFieldSerializer;
|
||||
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldReader;
|
||||
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldReader;
|
||||
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::FastFieldCodecReader;
|
||||
use fastfield_codecs::FastFieldCodecSerializer;
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
/// FastFieldReader is the trait to access fast field data.
|
||||
pub trait FastFieldReader<Item: FastValue>: Clone {
|
||||
@@ -42,9 +48,9 @@ pub trait FastFieldReader<Item: FastValue>: Clone {
|
||||
|
||||
/// Returns the minimum value for this fast field.
|
||||
///
|
||||
/// The max value does not take in account of possible
|
||||
/// deleted document, and should be considered as an upper bound
|
||||
/// of the actual maximum value.
|
||||
/// The min value does not take in account of possible
|
||||
/// deleted document, and should be considered as a lower bound
|
||||
/// of the actual mimimum value.
|
||||
fn min_value(&self) -> Item;
|
||||
|
||||
/// Returns the maximum value for this fast field.
|
||||
@@ -61,15 +67,48 @@ pub trait FastFieldReader<Item: FastValue>: Clone {
|
||||
///
|
||||
pub enum DynamicFastFieldReader<Item: FastValue> {
|
||||
/// Bitpacked compressed fastfield data.
|
||||
Bitpacked(BitpackedFastFieldReader<Item>),
|
||||
Bitpacked(FastFieldReaderCodecWrapper<Item, BitpackedReader>),
|
||||
/// Linear interpolated values + bitpacked
|
||||
LinearInterpol(FastFieldReaderCodecWrapper<Item, LinearInterpolFastFieldReader>),
|
||||
/// Blockwise linear interpolated values + bitpacked
|
||||
MultiLinearInterpol(FastFieldReaderCodecWrapper<Item, MultiLinearInterpolFastFieldReader>),
|
||||
}
|
||||
|
||||
impl<Item: FastValue> DynamicFastFieldReader<Item> {
|
||||
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
|
||||
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
|
||||
Ok(DynamicFastFieldReader::Bitpacked(
|
||||
BitpackedFastFieldReader::open(file)?,
|
||||
))
|
||||
let mut bytes = file.read_bytes()?;
|
||||
let id = bytes.read_u8();
|
||||
|
||||
let reader = match id {
|
||||
BitpackedFastFieldSerializer::ID => {
|
||||
DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::<
|
||||
Item,
|
||||
BitpackedReader,
|
||||
>::open_from_bytes(bytes)?)
|
||||
}
|
||||
LinearInterpolFastFieldSerializer::ID => {
|
||||
DynamicFastFieldReader::LinearInterpol(FastFieldReaderCodecWrapper::<
|
||||
Item,
|
||||
LinearInterpolFastFieldReader,
|
||||
>::open_from_bytes(bytes)?)
|
||||
}
|
||||
MultiLinearInterpolFastFieldSerializer::ID => {
|
||||
DynamicFastFieldReader::MultiLinearInterpol(FastFieldReaderCodecWrapper::<
|
||||
Item,
|
||||
MultiLinearInterpolFastFieldReader,
|
||||
>::open_from_bytes(
|
||||
bytes
|
||||
)?)
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"unknown fastfield id {:?}. Data corrupted or using old tantivy version.",
|
||||
id
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(reader)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,57 +116,67 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
|
||||
fn get(&self, doc: DocId) -> Item {
|
||||
match self {
|
||||
Self::Bitpacked(reader) => reader.get(doc),
|
||||
Self::LinearInterpol(reader) => reader.get(doc),
|
||||
Self::MultiLinearInterpol(reader) => reader.get(doc),
|
||||
}
|
||||
}
|
||||
fn get_range(&self, start: DocId, output: &mut [Item]) {
|
||||
match self {
|
||||
Self::Bitpacked(reader) => reader.get_range(start, output),
|
||||
Self::LinearInterpol(reader) => reader.get_range(start, output),
|
||||
Self::MultiLinearInterpol(reader) => reader.get_range(start, output),
|
||||
}
|
||||
}
|
||||
fn min_value(&self) -> Item {
|
||||
match self {
|
||||
Self::Bitpacked(reader) => reader.min_value(),
|
||||
Self::LinearInterpol(reader) => reader.min_value(),
|
||||
Self::MultiLinearInterpol(reader) => reader.min_value(),
|
||||
}
|
||||
}
|
||||
fn max_value(&self) -> Item {
|
||||
match self {
|
||||
Self::Bitpacked(reader) => reader.max_value(),
|
||||
Self::LinearInterpol(reader) => reader.max_value(),
|
||||
Self::MultiLinearInterpol(reader) => reader.max_value(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for accessing a fastfield.
|
||||
/// Wrapper for accessing a fastfield.
|
||||
///
|
||||
/// Holds the data and the codec to the read the data.
|
||||
///
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
#[derive(Clone)]
|
||||
pub struct BitpackedFastFieldReader<Item: FastValue> {
|
||||
pub struct FastFieldReaderCodecWrapper<Item: FastValue, CodecReader> {
|
||||
reader: CodecReader,
|
||||
bytes: OwnedBytes,
|
||||
bit_unpacker: BitUnpacker,
|
||||
min_value_u64: u64,
|
||||
max_value_u64: u64,
|
||||
_phantom: PhantomData<Item>,
|
||||
}
|
||||
|
||||
impl<Item: FastValue> BitpackedFastFieldReader<Item> {
|
||||
impl<Item: FastValue, C: FastFieldCodecReader> FastFieldReaderCodecWrapper<Item, C> {
|
||||
/// Opens a fast field given a file.
|
||||
pub fn open(file: FileSlice) -> crate::Result<Self> {
|
||||
let mut bytes = file.read_bytes()?;
|
||||
let min_value = u64::deserialize(&mut bytes)?;
|
||||
let amplitude = u64::deserialize(&mut bytes)?;
|
||||
let max_value = min_value + amplitude;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let bit_unpacker = BitUnpacker::new(num_bits);
|
||||
Ok(BitpackedFastFieldReader {
|
||||
let id = u8::deserialize(&mut bytes)?;
|
||||
assert_eq!(
|
||||
BitpackedFastFieldSerializer::ID,
|
||||
id,
|
||||
"Tried to open fast field as bitpacked encoded (id=1), but got serializer with different id"
|
||||
);
|
||||
Self::open_from_bytes(bytes)
|
||||
}
|
||||
/// Opens a fast field given the bytes.
|
||||
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
|
||||
let reader = C::open_from_bytes(bytes.as_slice())?;
|
||||
Ok(FastFieldReaderCodecWrapper {
|
||||
reader,
|
||||
bytes,
|
||||
min_value_u64: min_value,
|
||||
max_value_u64: max_value,
|
||||
bit_unpacker,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
pub(crate) fn get_u64(&self, doc: u64) -> Item {
|
||||
Item::from_u64(self.min_value_u64 + self.bit_unpacker.get(doc, &self.bytes))
|
||||
Item::from_u64(self.reader.get_u64(doc, self.bytes.as_slice()))
|
||||
}
|
||||
|
||||
/// Internally `multivalued` also use SingleValue Fast fields.
|
||||
@@ -149,7 +198,9 @@ impl<Item: FastValue> BitpackedFastFieldReader<Item> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
|
||||
impl<Item: FastValue, C: FastFieldCodecReader + Clone> FastFieldReader<Item>
|
||||
for FastFieldReaderCodecWrapper<Item, C>
|
||||
{
|
||||
/// Return the value associated to the given document.
|
||||
///
|
||||
/// This accessor should return as fast as possible.
|
||||
@@ -185,7 +236,7 @@ impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
|
||||
/// deleted document, and should be considered as an upper bound
|
||||
/// of the actual maximum value.
|
||||
fn min_value(&self) -> Item {
|
||||
Item::from_u64(self.min_value_u64)
|
||||
Item::from_u64(self.reader.min_value())
|
||||
}
|
||||
|
||||
/// Returns the maximum value for this fast field.
|
||||
@@ -194,12 +245,14 @@ impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
|
||||
/// deleted document, and should be considered as an upper bound
|
||||
/// of the actual maximum value.
|
||||
fn max_value(&self) -> Item {
|
||||
Item::from_u64(self.max_value_u64)
|
||||
Item::from_u64(self.reader.max_value())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
|
||||
fn from(vals: Vec<Item>) -> BitpackedFastFieldReader<Item> {
|
||||
pub(crate) type BitpackedFastFieldReader<Item> = FastFieldReaderCodecWrapper<Item, BitpackedReader>;
|
||||
|
||||
impl<Item: FastValue> From<Vec<Item>> for DynamicFastFieldReader<Item> {
|
||||
fn from(vals: Vec<Item>) -> DynamicFastFieldReader<Item> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder.add_u64_field("field", FAST);
|
||||
let schema = schema_builder.build();
|
||||
@@ -231,6 +284,6 @@ impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
|
||||
let field_file = composite_file
|
||||
.open_read(field)
|
||||
.expect("File component not found");
|
||||
BitpackedFastFieldReader::open(field_file).unwrap()
|
||||
DynamicFastFieldReader::open(field_file).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::space_usage::PerFieldSpaceUsage;
|
||||
use crate::TantivyError;
|
||||
|
||||
use super::reader::DynamicFastFieldReader;
|
||||
use super::FastFieldReader;
|
||||
|
||||
/// Provides access to all of the BitpackedFastFieldReader.
|
||||
///
|
||||
@@ -112,9 +111,8 @@ impl FastFieldReaders {
|
||||
&self,
|
||||
field: Field,
|
||||
) -> crate::Result<MultiValuedFastFieldReader<TFastValue>> {
|
||||
let fast_field_slice_idx = self.fast_field_data(field, 0)?;
|
||||
let idx_reader = self.typed_fast_field_reader(field)?;
|
||||
let fast_field_slice_vals = self.fast_field_data(field, 1)?;
|
||||
let idx_reader = BitpackedFastFieldReader::open(fast_field_slice_idx)?;
|
||||
let vals_reader: BitpackedFastFieldReader<TFastValue> =
|
||||
BitpackedFastFieldReader::open(fast_field_slice_vals)?;
|
||||
Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader))
|
||||
@@ -139,7 +137,7 @@ impl FastFieldReaders {
|
||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns an Error.
|
||||
pub fn i64(&self, field: Field) -> crate::Result<impl FastFieldReader<i64>> {
|
||||
pub fn i64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<i64>> {
|
||||
self.check_type(field, FastType::I64, Cardinality::SingleValue)?;
|
||||
self.typed_fast_field_reader(field)
|
||||
}
|
||||
@@ -147,7 +145,7 @@ impl FastFieldReaders {
|
||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns an Error.
|
||||
pub fn date(&self, field: Field) -> crate::Result<impl FastFieldReader<crate::DateTime>> {
|
||||
pub fn date(&self, field: Field) -> crate::Result<DynamicFastFieldReader<crate::DateTime>> {
|
||||
self.check_type(field, FastType::Date, Cardinality::SingleValue)?;
|
||||
self.typed_fast_field_reader(field)
|
||||
}
|
||||
@@ -155,7 +153,7 @@ impl FastFieldReaders {
|
||||
/// Returns the `f64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a f64 fast field, this method returns an Error.
|
||||
pub fn f64(&self, field: Field) -> crate::Result<impl FastFieldReader<f64>> {
|
||||
pub fn f64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<f64>> {
|
||||
self.check_type(field, FastType::F64, Cardinality::SingleValue)?;
|
||||
self.typed_fast_field_reader(field)
|
||||
}
|
||||
|
||||
@@ -1,256 +0,0 @@
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::common::CompositeWrite;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Field;
|
||||
use std::io::{self, Write};
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
/// `CompositeFastFieldSerializer` is in charge of serializing
|
||||
/// fastfields on disk.
|
||||
///
|
||||
/// Fast fields have differnt encodings like bit-packing.
|
||||
///
|
||||
/// `FastFieldWriter`s are in charge of pushing the data to
|
||||
/// the serializer.
|
||||
/// The serializer expects to receive the following calls.
|
||||
///
|
||||
/// * `new_u64_fast_field(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * ...
|
||||
/// * `close_field()`
|
||||
/// * `new_u64_fast_field(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * ...
|
||||
/// * `close_field()`
|
||||
/// * `close()`
|
||||
pub struct CompositeFastFieldSerializer {
|
||||
composite_write: CompositeWrite<WritePtr>,
|
||||
}
|
||||
|
||||
impl CompositeFastFieldSerializer {
|
||||
/// Constructor
|
||||
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
|
||||
// just making room for the pointer to header.
|
||||
let composite_write = CompositeWrite::wrap(write);
|
||||
Ok(CompositeFastFieldSerializer { composite_write })
|
||||
}
|
||||
|
||||
/// Start serializing a new u64 fast field
|
||||
pub fn new_u64_fast_field(
|
||||
&mut self,
|
||||
field: Field,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
|
||||
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
|
||||
}
|
||||
|
||||
/// Start serializing a new u64 fast field
|
||||
pub fn new_u64_fast_field_with_idx(
|
||||
&mut self,
|
||||
field: Field,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
idx: usize,
|
||||
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, idx);
|
||||
DynamicFastFieldSerializer::open(field_write, min_value, max_value)
|
||||
}
|
||||
|
||||
/// Start serializing a new [u8] fast field
|
||||
pub fn new_bytes_fast_field_with_idx(
|
||||
&mut self,
|
||||
field: Field,
|
||||
idx: usize,
|
||||
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, idx);
|
||||
FastBytesFieldSerializer { write: field_write }
|
||||
}
|
||||
|
||||
/// Closes the serializer
|
||||
///
|
||||
/// After this call the data must be persistently save on disk.
|
||||
pub fn close(self) -> io::Result<()> {
|
||||
self.composite_write.close()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EstimationStats {
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
}
|
||||
/// The FastFieldSerializer trait is the common interface
|
||||
/// implemented by every fastfield serializer variant.
|
||||
///
|
||||
/// `DynamicFastFieldSerializer` is the enum wrapping all variants.
|
||||
/// It is used to create an serializer instance.
|
||||
pub trait FastFieldSerializer {
|
||||
/// add value to serializer
|
||||
fn add_val(&mut self, val: u64) -> io::Result<()>;
|
||||
/// finish serializing a field.
|
||||
fn close_field(self) -> io::Result<()>;
|
||||
}
|
||||
|
||||
/// The FastFieldSerializerEstimate trait is required on all variants
|
||||
/// of fast field compressions, to decide which one to choose.
|
||||
pub trait FastFieldSerializerEstimate {
|
||||
/// returns an estimate of the compression ratio.
|
||||
fn estimate(
|
||||
/*fastfield_accessor: impl FastFieldReader<u64>,*/ stats: EstimationStats,
|
||||
) -> (f32, &'static str);
|
||||
/// the unique name of the compressor
|
||||
fn name() -> &'static str;
|
||||
}
|
||||
|
||||
pub enum DynamicFastFieldSerializer<'a, W: Write> {
|
||||
Bitpacked(BitpackedFastFieldSerializer<'a, W>),
|
||||
}
|
||||
|
||||
impl<'a, W: Write> DynamicFastFieldSerializer<'a, W> {
|
||||
/// Creates a new fast field serializer.
|
||||
///
|
||||
/// The serializer in fact encode the values by bitpacking
|
||||
/// `(val - min_value)`.
|
||||
///
|
||||
/// It requires a `min_value` and a `max_value` to compute
|
||||
/// compute the minimum number of bits required to encode
|
||||
/// values.
|
||||
pub fn open(
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
|
||||
let stats = EstimationStats {
|
||||
min_value,
|
||||
max_value,
|
||||
};
|
||||
let (_ratio, name) = (
|
||||
BitpackedFastFieldSerializer::<Vec<u8>>::estimate(stats),
|
||||
BitpackedFastFieldSerializer::<Vec<u8>>::name(),
|
||||
);
|
||||
Self::open_from_name(write, min_value, max_value, name)
|
||||
}
|
||||
|
||||
/// Creates a new fast field serializer.
|
||||
///
|
||||
/// The serializer in fact encode the values by bitpacking
|
||||
/// `(val - min_value)`.
|
||||
///
|
||||
/// It requires a `min_value` and a `max_value` to compute
|
||||
/// compute the minimum number of bits required to encode
|
||||
/// values.
|
||||
pub fn open_from_name(
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
name: &str,
|
||||
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
|
||||
// Weirdly the W generic on BitpackedFastFieldSerializer needs to be set,
|
||||
// although name() doesn't use it
|
||||
let variant = if name == BitpackedFastFieldSerializer::<Vec<u8>>::name() {
|
||||
DynamicFastFieldSerializer::Bitpacked(BitpackedFastFieldSerializer::open(
|
||||
write, min_value, max_value,
|
||||
)?)
|
||||
} else {
|
||||
panic!("unknown fastfield serializer {}", name);
|
||||
};
|
||||
|
||||
Ok(variant)
|
||||
}
|
||||
}
|
||||
impl<'a, W: Write> FastFieldSerializer for DynamicFastFieldSerializer<'a, W> {
|
||||
fn add_val(&mut self, val: u64) -> io::Result<()> {
|
||||
match self {
|
||||
Self::Bitpacked(serializer) => serializer.add_val(val),
|
||||
}
|
||||
}
|
||||
fn close_field(self) -> io::Result<()> {
|
||||
match self {
|
||||
Self::Bitpacked(serializer) => serializer.close_field(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BitpackedFastFieldSerializer<'a, W: Write> {
|
||||
bit_packer: BitPacker,
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
num_bits: u8,
|
||||
}
|
||||
|
||||
impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> {
|
||||
/// Creates a new fast field serializer.
|
||||
///
|
||||
/// The serializer in fact encode the values by bitpacking
|
||||
/// `(val - min_value)`.
|
||||
///
|
||||
/// It requires a `min_value` and a `max_value` to compute
|
||||
/// compute the minimum number of bits required to encode
|
||||
/// values.
|
||||
fn open(
|
||||
write: &'a mut W,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
) -> io::Result<BitpackedFastFieldSerializer<'a, W>> {
|
||||
assert!(min_value <= max_value);
|
||||
min_value.serialize(write)?;
|
||||
let amplitude = max_value - min_value;
|
||||
amplitude.serialize(write)?;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let bit_packer = BitPacker::new();
|
||||
Ok(BitpackedFastFieldSerializer {
|
||||
bit_packer,
|
||||
write,
|
||||
min_value,
|
||||
num_bits,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> {
|
||||
/// Pushes a new value to the currently open u64 fast field.
|
||||
fn add_val(&mut self, val: u64) -> io::Result<()> {
|
||||
let val_to_write: u64 = val - self.min_value;
|
||||
self.bit_packer
|
||||
.write(val_to_write, self.num_bits, &mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
fn close_field(mut self) -> io::Result<()> {
|
||||
self.bit_packer.close(&mut self.write)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> {
|
||||
fn estimate(
|
||||
/*_fastfield_accessor: impl FastFieldReader<u64>, */ stats: EstimationStats,
|
||||
) -> (f32, &'static str) {
|
||||
let amplitude = stats.max_value - stats.min_value;
|
||||
let num_bits = compute_num_bits(amplitude);
|
||||
let num_bits_uncompressed = 64;
|
||||
let ratio = num_bits as f32 / num_bits_uncompressed as f32;
|
||||
let name = Self::name();
|
||||
(ratio, name)
|
||||
}
|
||||
fn name() -> &'static str {
|
||||
"Bitpacked"
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FastBytesFieldSerializer<'a, W: Write> {
|
||||
write: &'a mut W,
|
||||
}
|
||||
|
||||
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
|
||||
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
|
||||
self.write.write_all(vals)
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> io::Result<()> {
|
||||
self.write.flush()
|
||||
}
|
||||
}
|
||||
203
src/fastfield/serializer/mod.rs
Normal file
203
src/fastfield/serializer/mod.rs
Normal file
@@ -0,0 +1,203 @@
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::common::CompositeWrite;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Field;
|
||||
pub use fastfield_codecs::bitpacked::BitpackedFastFieldSerializer;
|
||||
pub use fastfield_codecs::bitpacked::BitpackedFastFieldSerializerLegacy;
|
||||
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
|
||||
pub use fastfield_codecs::FastFieldCodecSerializer;
|
||||
pub use fastfield_codecs::FastFieldDataAccess;
|
||||
pub use fastfield_codecs::FastFieldStats;
|
||||
use std::io::{self, Write};
|
||||
|
||||
/// `CompositeFastFieldSerializer` is in charge of serializing
|
||||
/// fastfields on disk.
|
||||
///
|
||||
/// Fast fields have different encodings like bit-packing.
|
||||
///
|
||||
/// `FastFieldWriter`s are in charge of pushing the data to
|
||||
/// the serializer.
|
||||
/// The serializer expects to receive the following calls.
|
||||
///
|
||||
/// * `new_u64_fast_field(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * ...
|
||||
/// * `close_field()`
|
||||
/// * `new_u64_fast_field(...)`
|
||||
/// * `add_val(...)`
|
||||
/// * ...
|
||||
/// * `close_field()`
|
||||
/// * `close()`
|
||||
pub struct CompositeFastFieldSerializer {
|
||||
composite_write: CompositeWrite<WritePtr>,
|
||||
}
|
||||
|
||||
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
|
||||
// https://github.com/rust-lang/rust/pull/86176
|
||||
fn codec_estimation<T: FastFieldCodecSerializer, A: FastFieldDataAccess>(
|
||||
stats: FastFieldStats,
|
||||
fastfield_accessor: &A,
|
||||
estimations: &mut Vec<(f32, &str, u8)>,
|
||||
) {
|
||||
if !T::is_applicable(fastfield_accessor, stats.clone()) {
|
||||
return;
|
||||
}
|
||||
let (ratio, name, id) = (
|
||||
T::estimate(fastfield_accessor, stats.clone()),
|
||||
T::NAME,
|
||||
T::ID,
|
||||
);
|
||||
estimations.push((ratio, name, id));
|
||||
}
|
||||
|
||||
impl CompositeFastFieldSerializer {
|
||||
/// Constructor
|
||||
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
|
||||
// just making room for the pointer to header.
|
||||
let composite_write = CompositeWrite::wrap(write);
|
||||
Ok(CompositeFastFieldSerializer { composite_write })
|
||||
}
|
||||
|
||||
/// Serialize data into a new u64 fast field. The best compression codec will be chosen automatically.
|
||||
pub fn create_auto_detect_u64_fast_field(
|
||||
&mut self,
|
||||
field: Field,
|
||||
stats: FastFieldStats,
|
||||
fastfield_accessor: impl FastFieldDataAccess,
|
||||
data_iter_1: impl Iterator<Item = u64>,
|
||||
data_iter_2: impl Iterator<Item = u64>,
|
||||
) -> io::Result<()> {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, 0);
|
||||
|
||||
let mut estimations = vec![];
|
||||
|
||||
codec_estimation::<BitpackedFastFieldSerializer, _>(
|
||||
stats.clone(),
|
||||
&fastfield_accessor,
|
||||
&mut estimations,
|
||||
);
|
||||
codec_estimation::<LinearInterpolFastFieldSerializer, _>(
|
||||
stats.clone(),
|
||||
&fastfield_accessor,
|
||||
&mut estimations,
|
||||
);
|
||||
codec_estimation::<MultiLinearInterpolFastFieldSerializer, _>(
|
||||
stats.clone(),
|
||||
&fastfield_accessor,
|
||||
&mut estimations,
|
||||
);
|
||||
if let Some(broken_estimation) = estimations
|
||||
.iter()
|
||||
.find(|estimation| estimation.0 == f32::NAN)
|
||||
{
|
||||
warn!(
|
||||
"broken estimation for fast field codec {}",
|
||||
broken_estimation.1
|
||||
);
|
||||
}
|
||||
// removing nan values for codecs with broken calculations, and max values which disables codecs
|
||||
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
|
||||
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
|
||||
let (_ratio, name, id) = estimations[0];
|
||||
debug!(
|
||||
"choosing fast field codec {} for field_id {:?}",
|
||||
name, field
|
||||
); // todo print actual field name
|
||||
id.serialize(field_write)?;
|
||||
match name {
|
||||
BitpackedFastFieldSerializer::NAME => {
|
||||
BitpackedFastFieldSerializer::serialize(
|
||||
field_write,
|
||||
&fastfield_accessor,
|
||||
stats,
|
||||
data_iter_1,
|
||||
data_iter_2,
|
||||
)?;
|
||||
}
|
||||
LinearInterpolFastFieldSerializer::NAME => {
|
||||
LinearInterpolFastFieldSerializer::serialize(
|
||||
field_write,
|
||||
&fastfield_accessor,
|
||||
stats,
|
||||
data_iter_1,
|
||||
data_iter_2,
|
||||
)?;
|
||||
}
|
||||
MultiLinearInterpolFastFieldSerializer::NAME => {
|
||||
MultiLinearInterpolFastFieldSerializer::serialize(
|
||||
field_write,
|
||||
&fastfield_accessor,
|
||||
stats,
|
||||
data_iter_1,
|
||||
data_iter_2,
|
||||
)?;
|
||||
}
|
||||
_ => {
|
||||
panic!("unknown fastfield serializer {}", name)
|
||||
}
|
||||
};
|
||||
field_write.flush()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start serializing a new u64 fast field
|
||||
pub fn new_u64_fast_field(
|
||||
&mut self,
|
||||
field: Field,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
|
||||
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
|
||||
}
|
||||
|
||||
/// Start serializing a new u64 fast field
|
||||
pub fn new_u64_fast_field_with_idx(
|
||||
&mut self,
|
||||
field: Field,
|
||||
min_value: u64,
|
||||
max_value: u64,
|
||||
idx: usize,
|
||||
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, idx);
|
||||
// Prepend codec id to field data for compatibility with DynamicFastFieldReader.
|
||||
let id = BitpackedFastFieldSerializer::ID;
|
||||
id.serialize(field_write)?;
|
||||
BitpackedFastFieldSerializerLegacy::open(field_write, min_value, max_value)
|
||||
}
|
||||
|
||||
/// Start serializing a new [u8] fast field
|
||||
pub fn new_bytes_fast_field_with_idx(
|
||||
&mut self,
|
||||
field: Field,
|
||||
idx: usize,
|
||||
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
|
||||
let field_write = self.composite_write.for_field_with_idx(field, idx);
|
||||
FastBytesFieldSerializer { write: field_write }
|
||||
}
|
||||
|
||||
/// Closes the serializer
|
||||
///
|
||||
/// After this call the data must be persistently save on disk.
|
||||
pub fn close(self) -> io::Result<()> {
|
||||
self.composite_write.close()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FastBytesFieldSerializer<'a, W: Write> {
|
||||
write: &'a mut W,
|
||||
}
|
||||
|
||||
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
|
||||
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
|
||||
self.write.write_all(vals)
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) -> io::Result<()> {
|
||||
self.write.flush()
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
use super::multivalued::MultiValuedFastFieldWriter;
|
||||
use super::serializer::FastFieldStats;
|
||||
use super::FastFieldDataAccess;
|
||||
use crate::common;
|
||||
use crate::fastfield::serializer::FastFieldSerializer;
|
||||
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::DocId;
|
||||
use fnv::FnvHashMap;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
@@ -265,9 +267,9 @@ impl IntFastFieldWriter {
|
||||
self.add_val(val);
|
||||
}
|
||||
|
||||
/// Extract the stored data
|
||||
pub(crate) fn get_data(&self) -> Vec<u64> {
|
||||
self.vals.iter().collect::<Vec<u64>>()
|
||||
/// get iterator over the data
|
||||
pub(crate) fn iter(&self) -> impl Iterator<Item = u64> + '_ {
|
||||
self.vals.iter()
|
||||
}
|
||||
|
||||
/// Push the fast fields value to the `FastFieldWriter`.
|
||||
@@ -281,17 +283,58 @@ impl IntFastFieldWriter {
|
||||
} else {
|
||||
(self.val_min, self.val_max)
|
||||
};
|
||||
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
for doc_id in doc_id_map.iter_old_doc_ids() {
|
||||
single_field_serializer.add_val(self.vals.get(*doc_id as usize))?;
|
||||
}
|
||||
} else {
|
||||
for val in self.vals.iter() {
|
||||
single_field_serializer.add_val(val)?;
|
||||
}
|
||||
let fastfield_accessor = WriterFastFieldAccessProvider {
|
||||
doc_id_map,
|
||||
vals: &self.vals,
|
||||
};
|
||||
let stats = FastFieldStats {
|
||||
min_value: min,
|
||||
max_value: max,
|
||||
num_vals: self.val_count as u64,
|
||||
};
|
||||
|
||||
single_field_serializer.close_field()
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let iter = doc_id_map
|
||||
.iter_old_doc_ids()
|
||||
.map(|doc_id| self.vals.get(*doc_id as usize));
|
||||
serializer.create_auto_detect_u64_fast_field(
|
||||
self.field,
|
||||
stats,
|
||||
fastfield_accessor,
|
||||
iter.clone(),
|
||||
iter,
|
||||
)?;
|
||||
} else {
|
||||
serializer.create_auto_detect_u64_fast_field(
|
||||
self.field,
|
||||
stats,
|
||||
fastfield_accessor,
|
||||
self.vals.iter(),
|
||||
self.vals.iter(),
|
||||
)?;
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct WriterFastFieldAccessProvider<'map, 'bitp> {
|
||||
doc_id_map: Option<&'map DocIdMapping>,
|
||||
vals: &'bitp BlockedBitpacker,
|
||||
}
|
||||
impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> {
|
||||
/// Return the value associated to the given document.
|
||||
///
|
||||
/// This accessor should return as fast as possible.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if `doc` is greater than the segment
|
||||
fn get(&self, doc: DocId) -> u64 {
|
||||
if let Some(doc_id_map) = self.doc_id_map {
|
||||
self.vals.get(doc_id_map.get_old_doc_id(doc) as usize) // consider extra FastFieldReader wrapper for non doc_id_map
|
||||
} else {
|
||||
self.vals.get(doc as usize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,9 +60,8 @@ pub(crate) fn get_doc_id_mapping_from_field(
|
||||
})?;
|
||||
|
||||
// create new doc_id to old doc_id index (used in fast_field_writers)
|
||||
let data = fast_field.get_data();
|
||||
let mut doc_id_and_data = data
|
||||
.into_iter()
|
||||
let mut doc_id_and_data = fast_field
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|el| (el.0 as DocId, el.1))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -3,8 +3,9 @@ use crate::error::DataCorruption;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::fastfield::FastFieldDataAccess;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::fastfield::FastFieldSerializer;
|
||||
use crate::fastfield::FastFieldStats;
|
||||
use crate::fastfield::MultiValuedFastFieldReader;
|
||||
use crate::fieldnorm::FieldNormsSerializer;
|
||||
use crate::fieldnorm::FieldNormsWriter;
|
||||
@@ -344,21 +345,38 @@ impl IndexMerger {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if let Some(doc_id_mapping) = doc_id_mapping {
|
||||
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
|
||||
(
|
||||
doc_id,
|
||||
&fast_field_readers[reader_with_ordinal.ordinal as usize],
|
||||
)
|
||||
});
|
||||
// add values in order of the new doc_ids
|
||||
let mut fast_single_field_serializer =
|
||||
fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?;
|
||||
for (doc_id, field_reader) in sorted_doc_ids {
|
||||
let val = field_reader.get(*doc_id);
|
||||
fast_single_field_serializer.add_val(val)?;
|
||||
#[derive(Clone)]
|
||||
struct SortedDocidFieldAccessProvider<'a> {
|
||||
doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
|
||||
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
|
||||
}
|
||||
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
|
||||
fn get(&self, doc: DocId) -> u64 {
|
||||
let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize];
|
||||
self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id)
|
||||
}
|
||||
}
|
||||
let stats = FastFieldStats {
|
||||
min_value,
|
||||
max_value,
|
||||
num_vals: doc_id_mapping.len() as u64,
|
||||
};
|
||||
let fastfield_accessor = SortedDocidFieldAccessProvider {
|
||||
doc_id_mapping,
|
||||
fast_field_readers: &fast_field_readers,
|
||||
};
|
||||
let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
|
||||
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
|
||||
fast_field_reader.get(*doc_id)
|
||||
});
|
||||
fast_field_serializer.create_auto_detect_u64_fast_field(
|
||||
field,
|
||||
stats,
|
||||
fastfield_accessor,
|
||||
iter.clone(),
|
||||
iter,
|
||||
)?;
|
||||
|
||||
fast_single_field_serializer.close_field()?;
|
||||
Ok(())
|
||||
} else {
|
||||
let u64_readers = self.readers.iter()
|
||||
@@ -496,19 +514,21 @@ impl IndexMerger {
|
||||
// Important: reader_and_field_accessor needs
|
||||
// to have the same order as self.readers since ReaderWithOrdinal
|
||||
// is used to index the reader_and_field_accessors vec.
|
||||
fn write_1_n_fast_field_idx_generic(
|
||||
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
|
||||
field: Field,
|
||||
fast_field_serializer: &mut CompositeFastFieldSerializer,
|
||||
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
|
||||
reader_and_field_accessors: &[(&SegmentReader, impl MultiValueLength)],
|
||||
reader_and_field_accessors: &[(&SegmentReader, T)],
|
||||
) -> crate::Result<()> {
|
||||
let mut total_num_vals = 0u64;
|
||||
// In the first pass, we compute the total number of vals.
|
||||
//
|
||||
// This is required by the bitpacker, as it needs to know
|
||||
// what should be the bit length use for bitpacking.
|
||||
let mut idx_num_vals = 0;
|
||||
for (reader, u64s_reader) in reader_and_field_accessors.iter() {
|
||||
if let Some(delete_bitset) = reader.delete_bitset() {
|
||||
idx_num_vals += reader.max_doc() as u64 - delete_bitset.len() as u64;
|
||||
for doc in 0u32..reader.max_doc() {
|
||||
if delete_bitset.is_alive(doc) {
|
||||
let num_vals = u64s_reader.get_len(doc) as u64;
|
||||
@@ -516,38 +536,60 @@ impl IndexMerger {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
idx_num_vals += reader.max_doc() as u64;
|
||||
total_num_vals += u64s_reader.get_total_len();
|
||||
}
|
||||
}
|
||||
|
||||
let stats = FastFieldStats {
|
||||
max_value: total_num_vals,
|
||||
num_vals: idx_num_vals,
|
||||
min_value: 0,
|
||||
};
|
||||
// We can now create our `idx` serializer, and in a second pass,
|
||||
// can effectively push the different indexes.
|
||||
if let Some(doc_id_mapping) = doc_id_mapping {
|
||||
let mut serialize_idx =
|
||||
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
|
||||
// copying into a temp vec is not ideal, but the fast field codec api requires random
|
||||
// access, which is used in the estimation. It's possible to 1. calculate random
|
||||
// acccess on the fly or 2. change the codec api to make random access optional, but
|
||||
// they both have also major drawbacks.
|
||||
|
||||
let mut offsets = vec![];
|
||||
let mut offset = 0;
|
||||
for (doc_id, reader) in doc_id_mapping {
|
||||
let reader = &reader_and_field_accessors[reader.ordinal as usize].1;
|
||||
serialize_idx.add_val(offset)?;
|
||||
offsets.push(offset);
|
||||
offset += reader.get_len(*doc_id) as u64;
|
||||
}
|
||||
serialize_idx.add_val(offset as u64)?;
|
||||
offsets.push(offset);
|
||||
|
||||
serialize_idx.close_field()?;
|
||||
fast_field_serializer.create_auto_detect_u64_fast_field(
|
||||
field,
|
||||
stats,
|
||||
&offsets,
|
||||
offsets.iter().cloned(),
|
||||
offsets.iter().cloned(),
|
||||
)?;
|
||||
} else {
|
||||
let mut serialize_idx =
|
||||
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
|
||||
let mut idx = 0;
|
||||
let mut offsets = vec![];
|
||||
let mut offset = 0;
|
||||
for (segment_reader, u64s_reader) in reader_and_field_accessors.iter() {
|
||||
for doc in segment_reader.doc_ids_alive() {
|
||||
serialize_idx.add_val(idx)?;
|
||||
idx += u64s_reader.get_len(doc) as u64;
|
||||
offsets.push(offset);
|
||||
offset += u64s_reader.get_len(doc) as u64;
|
||||
}
|
||||
}
|
||||
serialize_idx.add_val(idx)?;
|
||||
serialize_idx.close_field()?;
|
||||
offsets.push(offset);
|
||||
|
||||
fast_field_serializer.create_auto_detect_u64_fast_field(
|
||||
field,
|
||||
stats,
|
||||
&offsets,
|
||||
offsets.iter().cloned(),
|
||||
offsets.iter().cloned(),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn write_multi_value_fast_field_idx(
|
||||
|
||||
@@ -362,6 +362,7 @@ mod bench_sorted_index_merge {
|
||||
|
||||
use crate::core::Index;
|
||||
//use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
use crate::schema::Cardinality;
|
||||
@@ -422,7 +423,7 @@ mod bench_sorted_index_merge {
|
||||
b.iter(|| {
|
||||
|
||||
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{
|
||||
let u64_reader: FastFieldReader<u64> = reader.reader
|
||||
let u64_reader: DynamicFastFieldReader<u64> = reader.reader
|
||||
.fast_fields()
|
||||
.typed_fast_field_reader(field)
|
||||
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
|
||||
|
||||
@@ -354,18 +354,18 @@ mod bench {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_docs_compression_numbits() {
|
||||
for expected_num_bits in 0u8.. {
|
||||
let mut data = [0u32; 128];
|
||||
if expected_num_bits > 0 {
|
||||
data[0] = (1u64 << (expected_num_bits as usize) - 1) as u32;
|
||||
}
|
||||
let mut encoder = BlockEncoder::new();
|
||||
let (num_bits, compressed) = encoder.compress_block_unsorted(&data);
|
||||
assert_eq!(compressed.len(), compressed_block_size(num_bits));
|
||||
}
|
||||
}
|
||||
//#[test]
|
||||
//fn test_all_docs_compression_numbits() {
|
||||
//for expected_num_bits in 0u8.. {
|
||||
//let mut data = [0u32; 128];
|
||||
//if expected_num_bits > 0 {
|
||||
//data[0] = (1u64 << (expected_num_bits as usize) - 1) as u32;
|
||||
//}
|
||||
//let mut encoder = BlockEncoder::new();
|
||||
//let (num_bits, compressed) = encoder.compress_block_unsorted(&data);
|
||||
//assert_eq!(compressed.len(), compressed_block_size(num_bits));
|
||||
//}
|
||||
//}
|
||||
|
||||
const NUM_INTS_BENCH_VINT: usize = 10;
|
||||
|
||||
|
||||
@@ -23,6 +23,15 @@ pub struct FieldEntry {
|
||||
}
|
||||
|
||||
impl FieldEntry {
|
||||
/// Creates a new field entry given a name and a field type
|
||||
pub fn new(field_name: String, field_type: FieldType) -> FieldEntry {
|
||||
assert!(is_valid_field_name(&field_name));
|
||||
FieldEntry {
|
||||
name: field_name,
|
||||
field_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new u64 field entry in the schema, given
|
||||
/// a name, and some options.
|
||||
pub fn new_text(field_name: String, text_options: TextOptions) -> FieldEntry {
|
||||
|
||||
@@ -133,12 +133,22 @@ pub mod tests {
|
||||
format!("Doc {}", i)
|
||||
);
|
||||
}
|
||||
|
||||
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
|
||||
let doc = doc?;
|
||||
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
|
||||
if !title_content.starts_with("Doc ") {
|
||||
panic!("unexpected title_content {}", title_content);
|
||||
}
|
||||
|
||||
let id = title_content
|
||||
.strip_prefix("Doc ")
|
||||
.unwrap()
|
||||
.parse::<u32>()
|
||||
.unwrap();
|
||||
if delete_bitset.is_deleted(id) {
|
||||
panic!("unexpected deleted document {}", id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -171,11 +171,6 @@ impl StoreReader {
|
||||
.filter_map(move |doc_id| {
|
||||
// filter_map is only used to resolve lifetime issues between the two closures on
|
||||
// the outer variables
|
||||
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
if !alive {
|
||||
// we keep the number of skipped documents to move forward in the map block
|
||||
num_skipped += 1;
|
||||
}
|
||||
|
||||
// check move to next checkpoint
|
||||
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
|
||||
@@ -187,6 +182,7 @@ impl StoreReader {
|
||||
num_skipped = 0;
|
||||
}
|
||||
|
||||
let alive = delete_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
|
||||
if alive {
|
||||
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
|
||||
// the map block will move over the num_skipped, so we reset to 0
|
||||
@@ -194,6 +190,8 @@ impl StoreReader {
|
||||
reset_block_pos = false;
|
||||
ret
|
||||
} else {
|
||||
// we keep the number of skipped documents to move forward in the map block
|
||||
num_skipped += 1;
|
||||
None
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user