Merge pull request #1418 from quickwit-oss/gcd_encoding

apply gcd on fastfield as preprocessing
This commit is contained in:
PSeitz
2022-07-29 02:00:14 -07:00
committed by GitHub
12 changed files with 589 additions and 93 deletions

View File

@@ -60,6 +60,7 @@ pretty_assertions = "1.2.1"
serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53"
arc-swap = "1.5.0"
gcd = "2.1.0"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -107,7 +107,7 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
/// values.
fn serialize(
write: &mut impl Write,
_fastfield_accessor: &impl FastFieldDataAccess,
_fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,

View File

@@ -42,7 +42,7 @@ pub trait FastFieldCodecSerializer {
/// The iterators should be preferred over using fastfield_accessor for performance reasons.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,

View File

@@ -111,7 +111,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,

View File

@@ -195,7 +195,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &impl FastFieldDataAccess,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,

224
src/fastfield/gcd.rs Normal file
View File

@@ -0,0 +1,224 @@
use std::io::{self, Write};
use common::BinarySerializable;
use fastdivide::DividerU64;
use fastfield_codecs::FastFieldCodecReader;
use gcd::Gcd;
pub const GCD_DEFAULT: u64 = 1;
pub const GCD_CODEC_ID: u8 = 4;
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct GCDFastFieldCodec<CodecReader> {
gcd: u64,
min_value: u64,
reader: CodecReader,
}
impl<C: FastFieldCodecReader + Clone> FastFieldCodecReader for GCDFastFieldCodec<C> {
/// Opens a fast field given the bytes.
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
let (header, mut footer) = bytes.split_at(bytes.len() - 16);
let gcd = u64::deserialize(&mut footer)?;
let min_value = u64::deserialize(&mut footer)?;
let reader = C::open_from_bytes(header)?;
Ok(GCDFastFieldCodec {
gcd,
min_value,
reader,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
let mut data = self.reader.get_u64(doc, data);
data *= self.gcd;
data += self.min_value;
data
}
fn min_value(&self) -> u64 {
self.min_value + self.reader.min_value() * self.gcd
}
fn max_value(&self) -> u64 {
self.min_value + self.reader.max_value() * self.gcd
}
}
pub fn write_gcd_header<W: Write>(field_write: &mut W, min_value: u64, gcd: u64) -> io::Result<()> {
gcd.serialize(field_write)?;
min_value.serialize(field_write)?;
Ok(())
}
// Find GCD for iterator of numbers
pub fn find_gcd(numbers: impl Iterator<Item = u64>) -> Option<u64> {
let mut numbers = numbers.filter(|n| *n != 0);
let mut gcd = numbers.next()?;
if gcd == 1 {
return Some(1);
}
let mut gcd_divider = DividerU64::divide_by(gcd);
for val in numbers {
let remainder = val - (gcd_divider.divide(val)) * gcd;
if remainder == 0 {
continue;
}
gcd = gcd.gcd(val);
if gcd == 1 {
return Some(1);
}
gcd_divider = DividerU64::divide_by(gcd);
}
Some(gcd)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::Path;
use common::HasLen;
use crate::directory::{CompositeFile, RamDirectory, WritePtr};
use crate::fastfield::serializer::FastFieldCodecEnableCheck;
use crate::fastfield::tests::{FIELD, FIELDI64, SCHEMA, SCHEMAI64};
use crate::fastfield::{
find_gcd, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldCodecName,
FastFieldReader, FastFieldsWriter, ALL_CODECS,
};
use crate::schema::Schema;
use crate::Directory;
fn get_index(
docs: &[crate::Document],
schema: &Schema,
codec_enable_checker: FastFieldCodecEnableCheck,
) -> crate::Result<RamDirectory> {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer =
CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker)
.unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(schema);
for doc in docs {
fast_field_writers.add_document(doc);
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
Ok(directory)
}
fn test_fastfield_gcd_i64_with_codec(
codec_name: FastFieldCodecName,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = i as i64 * 1000i64;
docs.push(doc!(*FIELDI64=>val));
}
let directory = get_index(&docs, &SCHEMAI64, codec_name.clone().into())?;
let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<i64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 1000i64);
assert_eq!(fast_field_reader.get(1), 2000i64);
assert_eq!(fast_field_reader.get(2), 3000i64);
assert_eq!(fast_field_reader.max_value(), num_vals as i64 * 1000);
assert_eq!(fast_field_reader.min_value(), 1000i64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001i64));
let directory = get_index(&docs, &SCHEMAI64, codec_name.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_i64() -> crate::Result<()> {
for codec_name in ALL_CODECS {
test_fastfield_gcd_i64_with_codec(codec_name.clone(), 5005)?;
}
Ok(())
}
fn test_fastfield_gcd_u64_with_codec(
codec_name: FastFieldCodecName,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = i as u64 * 1000u64;
docs.push(doc!(*FIELD=>val));
}
let directory = get_index(&docs, &SCHEMA, codec_name.clone().into())?;
let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 1000u64);
assert_eq!(fast_field_reader.get(1), 2000u64);
assert_eq!(fast_field_reader.get(2), 3000u64);
assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000);
assert_eq!(fast_field_reader.min_value(), 1000u64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001u64));
let directory = get_index(&docs, &SCHEMA, codec_name.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_u64() -> crate::Result<()> {
for codec_name in ALL_CODECS {
test_fastfield_gcd_u64_with_codec(codec_name.clone(), 5005)?;
}
Ok(())
}
#[test]
pub fn test_fastfield2() {
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
assert_eq!(test_fastfield.get(0), 100);
assert_eq!(test_fastfield.get(1), 200);
assert_eq!(test_fastfield.get(2), 300);
}
#[test]
fn find_gcd_test() {
assert_eq!(find_gcd([0].into_iter()), None);
assert_eq!(find_gcd([0, 10].into_iter()), Some(10));
assert_eq!(find_gcd([10, 0].into_iter()), Some(10));
assert_eq!(find_gcd([].into_iter()), None);
assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), Some(5));
assert_eq!(find_gcd([15, 16, 10].into_iter()), Some(1));
assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), Some(5));
}
}

View File

@@ -24,6 +24,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::readers::FastFieldReaders;
@@ -37,12 +38,25 @@ mod alive_bitset;
mod bytes;
mod error;
mod facet_reader;
mod gcd;
mod multivalued;
mod reader;
mod readers;
mod serializer;
mod writer;
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub(crate) enum FastFieldCodecName {
Bitpacked,
LinearInterpol,
BlockwiseLinearInterpol,
}
pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[
FastFieldCodecName::Bitpacked,
FastFieldCodecName::LinearInterpol,
FastFieldCodecName::BlockwiseLinearInterpol,
];
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
@@ -276,7 +290,14 @@ mod tests {
schema_builder.build()
});
pub static SCHEMAI64: Lazy<Schema> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
schema_builder.add_i64_field("field", FAST);
schema_builder.build()
});
pub static FIELD: Lazy<Field> = Lazy::new(|| SCHEMA.get_field("field").unwrap());
pub static FIELDI64: Lazy<Field> = Lazy::new(|| SCHEMAI64.get_field("field").unwrap());
#[test]
pub fn test_fastfield() {
@@ -425,7 +446,7 @@ mod tests {
}
#[test]
fn test_signed_intfastfield() -> crate::Result<()> {
fn test_signed_intfastfield_normal() -> crate::Result<()> {
let path = Path::new("test");
let directory: RamDirectory = RamDirectory::create();
let mut schema_builder = Schema::builder();
@@ -505,10 +526,15 @@ mod tests {
permutation
}
#[test]
fn test_intfastfield_permutation() -> crate::Result<()> {
// Warning: this generates the same permutation at each call
pub fn generate_permutation_gcd() -> Vec<u64> {
let mut permutation: Vec<u64> = (1u64..100_000u64).map(|el| el * 1000).collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}
fn test_intfastfield_permutation_with_data(permutation: Vec<u64>) -> crate::Result<()> {
let path = Path::new("test");
let permutation = generate_permutation();
let n = permutation.len();
let directory = RamDirectory::create();
{
@@ -527,15 +553,27 @@ mod tests {
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let mut a = 0u64;
for _ in 0..n {
for a in 0..n {
assert_eq!(fast_field_reader.get(a as u32), permutation[a as usize]);
a = fast_field_reader.get(a as u32);
}
}
Ok(())
}
#[test]
fn test_intfastfield_permutation_gcd() -> crate::Result<()> {
let permutation = generate_permutation_gcd();
test_intfastfield_permutation_with_data(permutation)?;
Ok(())
}
#[test]
fn test_intfastfield_permutation() -> crate::Result<()> {
let permutation = generate_permutation();
test_intfastfield_permutation_with_data(permutation)?;
Ok(())
}
#[test]
fn test_merge_missing_date_fast_field() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
@@ -951,6 +989,7 @@ mod bench {
use super::tests::{generate_permutation, FIELD, SCHEMA};
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::fastfield::tests::generate_permutation_gcd;
use crate::fastfield::FastFieldReader;
#[bench]
@@ -1037,10 +1076,42 @@ mod bench {
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
b.iter(|| {
let n = test::black_box(1000u32);
let mut a = 0u32;
for _ in 0u32..n {
a = fast_field_reader.get(a) as u32;
for i in 0u32..permutation.len() as u32 {
a = fast_field_reader.get(i) as u32;
}
a
});
}
}
#[bench]
fn bench_intfastfield_fflookup_gcd(b: &mut Bencher) {
let path = Path::new("test");
let permutation = generate_permutation_gcd();
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA);
for &x in &permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x));
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data).unwrap();
b.iter(|| {
let mut a = 0u32;
for i in 0u32..permutation.len() as u32 {
a = fast_field_reader.get(i) as u32;
}
a
});

View File

@@ -346,6 +346,13 @@ mod tests {
assert!(test_multivalued_no_panic(&ops[..]).is_ok());
}
}
#[test]
fn test_multivalued_proptest_gcd() {
use IndexingOp::*;
let ops = [AddDoc { id: 9 }, AddDoc { id: 9 }, Merge];
assert!(test_multivalued_no_panic(&ops[..]).is_ok());
}
#[test]
fn test_multivalued_proptest_off_by_one_bug_1151() {

View File

@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::Path;
use common::BinarySerializable;
use fastfield_codecs::bitpacked::{
BitpackedFastFieldReader as BitpackedReader, BitpackedFastFieldSerializer,
};
@@ -14,7 +13,7 @@ use fastfield_codecs::multilinearinterpol::{
};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer};
use super::FastValue;
use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID};
use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr};
use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
use crate::schema::{Schema, FAST};
@@ -71,15 +70,26 @@ pub enum DynamicFastFieldReader<Item: FastValue> {
LinearInterpol(FastFieldReaderCodecWrapper<Item, LinearInterpolFastFieldReader>),
/// Blockwise linear interpolated values + bitpacked
MultiLinearInterpol(FastFieldReaderCodecWrapper<Item, MultiLinearInterpolFastFieldReader>),
/// GCD and Bitpacked compressed fastfield data.
BitpackedGCD(FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<BitpackedReader>>),
/// GCD and Linear interpolated values + bitpacked
LinearInterpolGCD(
FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<LinearInterpolFastFieldReader>>,
),
/// GCD and Blockwise linear interpolated values + bitpacked
MultiLinearInterpolGCD(
FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<MultiLinearInterpolFastFieldReader>>,
),
}
impl<Item: FastValue> DynamicFastFieldReader<Item> {
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
let mut bytes = file.read_bytes()?;
let id = bytes.read_u8();
let reader = match id {
pub fn open_from_id(
mut bytes: OwnedBytes,
codec_id: u8,
) -> crate::Result<DynamicFastFieldReader<Item>> {
let reader = match codec_id {
BitpackedFastFieldSerializer::ID => {
DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::<
Item,
@@ -100,15 +110,59 @@ impl<Item: FastValue> DynamicFastFieldReader<Item> {
bytes
)?)
}
_ if codec_id == GCD_CODEC_ID => {
let codec_id = bytes.read_u8();
match codec_id {
BitpackedFastFieldSerializer::ID => {
DynamicFastFieldReader::BitpackedGCD(FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<BitpackedReader>,
>::open_from_bytes(
bytes
)?)
}
LinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::LinearInterpolGCD(FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<LinearInterpolFastFieldReader>,
>::open_from_bytes(
bytes
)?)
}
MultiLinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::MultiLinearInterpolGCD(
FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<MultiLinearInterpolFastFieldReader>,
>::open_from_bytes(bytes)?,
)
}
_ => {
panic!(
"unknown fastfield codec id {:?}. Data corrupted or using old tantivy \
version.",
codec_id
)
}
}
}
_ => {
panic!(
"unknown fastfield id {:?}. Data corrupted or using old tantivy version.",
id
"unknown fastfield codec id {:?}. Data corrupted or using old tantivy version.",
codec_id
)
}
};
Ok(reader)
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
let mut bytes = file.read_bytes()?;
let codec_id = bytes.read_u8();
Self::open_from_id(bytes, codec_id)
}
}
impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
@@ -118,6 +172,9 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
Self::Bitpacked(reader) => reader.get(doc),
Self::LinearInterpol(reader) => reader.get(doc),
Self::MultiLinearInterpol(reader) => reader.get(doc),
Self::BitpackedGCD(reader) => reader.get(doc),
Self::LinearInterpolGCD(reader) => reader.get(doc),
Self::MultiLinearInterpolGCD(reader) => reader.get(doc),
}
}
#[inline]
@@ -126,6 +183,9 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
Self::Bitpacked(reader) => reader.get_range(start, output),
Self::LinearInterpol(reader) => reader.get_range(start, output),
Self::MultiLinearInterpol(reader) => reader.get_range(start, output),
Self::BitpackedGCD(reader) => reader.get_range(start, output),
Self::LinearInterpolGCD(reader) => reader.get_range(start, output),
Self::MultiLinearInterpolGCD(reader) => reader.get_range(start, output),
}
}
fn min_value(&self) -> Item {
@@ -133,6 +193,9 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
Self::Bitpacked(reader) => reader.min_value(),
Self::LinearInterpol(reader) => reader.min_value(),
Self::MultiLinearInterpol(reader) => reader.min_value(),
Self::BitpackedGCD(reader) => reader.min_value(),
Self::LinearInterpolGCD(reader) => reader.min_value(),
Self::MultiLinearInterpolGCD(reader) => reader.min_value(),
}
}
fn max_value(&self) -> Item {
@@ -140,6 +203,9 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
Self::Bitpacked(reader) => reader.max_value(),
Self::LinearInterpol(reader) => reader.max_value(),
Self::MultiLinearInterpol(reader) => reader.max_value(),
Self::BitpackedGCD(reader) => reader.max_value(),
Self::LinearInterpolGCD(reader) => reader.max_value(),
Self::MultiLinearInterpolGCD(reader) => reader.max_value(),
}
}
}
@@ -158,10 +224,10 @@ impl<Item: FastValue, C: FastFieldCodecReader> FastFieldReaderCodecWrapper<Item,
/// Opens a fast field given a file.
pub fn open(file: FileSlice) -> crate::Result<Self> {
let mut bytes = file.read_bytes()?;
let id = u8::deserialize(&mut bytes)?;
let codec_id = bytes.read_u8();
assert_eq!(
BitpackedFastFieldSerializer::ID,
id,
codec_id,
"Tried to open fast field as bitpacked encoded (id=1), but got serializer with \
different id"
);
@@ -178,7 +244,8 @@ impl<Item: FastValue, C: FastFieldCodecReader> FastFieldReaderCodecWrapper<Item,
}
#[inline]
pub(crate) fn get_u64(&self, doc: u64) -> Item {
Item::from_u64(self.reader.get_u64(doc, self.bytes.as_slice()))
let data = self.reader.get_u64(doc, self.bytes.as_slice());
Item::from_u64(data)
}
/// Internally `multivalued` also use SingleValue Fast fields.

View File

@@ -8,7 +8,10 @@ use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
pub use fastfield_codecs::{FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
use super::{find_gcd, FastFieldCodecName, ALL_CODECS, GCD_DEFAULT};
use crate::directory::{CompositeWrite, WritePtr};
use crate::fastfield::gcd::write_gcd_header;
use crate::fastfield::GCD_CODEC_ID;
use crate::schema::Field;
/// `CompositeFastFieldSerializer` is in charge of serializing
@@ -33,6 +36,30 @@ use crate::schema::Field;
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
codec_enable_checker: FastFieldCodecEnableCheck,
}
#[derive(Debug, Clone)]
pub struct FastFieldCodecEnableCheck {
enabled_codecs: Vec<FastFieldCodecName>,
}
impl FastFieldCodecEnableCheck {
fn allow_all() -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: ALL_CODECS.to_vec(),
}
}
fn is_enabled(&self, codec_name: FastFieldCodecName) -> bool {
self.enabled_codecs.contains(&codec_name)
}
}
impl From<FastFieldCodecName> for FastFieldCodecEnableCheck {
fn from(codec_name: FastFieldCodecName) -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: vec![codec_name],
}
}
}
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
@@ -52,60 +79,154 @@ fn codec_estimation<T: FastFieldCodecSerializer, A: FastFieldDataAccess>(
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all())
}
/// Constructor
pub fn from_write_with_codec(
write: WritePtr,
codec_enable_checker: FastFieldCodecEnableCheck,
) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer { composite_write })
Ok(CompositeFastFieldSerializer {
composite_write,
codec_enable_checker,
})
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field(
pub fn create_auto_detect_u64_fast_field<F, I>(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
data_iter_1: impl Iterator<Item = u64>,
data_iter_2: impl Iterator<Item = u64>,
) -> io::Result<()> {
iter_gen: F,
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
self.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
data_iter_1,
data_iter_2,
iter_gen,
0,
)
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx(
pub fn write_header<W: Write>(field_write: &mut W, codec_id: u8) -> io::Result<()> {
codec_id.serialize(field_write)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx<F, I>(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
data_iter_1: impl Iterator<Item = u64>,
data_iter_2: impl Iterator<Item = u64>,
iter_gen: F,
idx: usize,
) -> io::Result<()> {
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
let field_write = self.composite_write.for_field_with_idx(field, idx);
let gcd = find_gcd(iter_gen().map(|val| val - stats.min_value)).unwrap_or(GCD_DEFAULT);
if gcd == 1 {
return Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
stats,
fastfield_accessor,
iter_gen(),
iter_gen(),
);
}
Self::write_header(field_write, GCD_CODEC_ID)?;
struct GCDWrappedFFAccess<T: FastFieldDataAccess> {
fastfield_accessor: T,
min_value: u64,
gcd: u64,
}
impl<T: FastFieldDataAccess> FastFieldDataAccess for GCDWrappedFFAccess<T> {
fn get_val(&self, position: u64) -> u64 {
(self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd
}
}
let fastfield_accessor = GCDWrappedFFAccess {
fastfield_accessor,
min_value: stats.min_value,
gcd,
};
let min_value = stats.min_value;
let stats = FastFieldStats {
min_value: 0,
max_value: (stats.max_value - stats.min_value) / gcd,
num_vals: stats.num_vals,
};
let iter1 = iter_gen().map(|val| (val - min_value) / gcd);
let iter2 = iter_gen().map(|val| (val - min_value) / gcd);
Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
stats,
fastfield_accessor,
iter1,
iter2,
)?;
write_gcd_header(field_write, min_value, gcd)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx_gcd<W: Write>(
codec_enable_checker: FastFieldCodecEnableCheck,
field: Field,
field_write: &mut CountingWriter<W>,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter1: impl Iterator<Item = u64>,
iter2: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut estimations = vec![];
codec_estimation::<BitpackedFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
codec_estimation::<LinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
codec_estimation::<MultiLinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
if codec_enable_checker.is_enabled(FastFieldCodecName::Bitpacked) {
codec_estimation::<BitpackedFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if codec_enable_checker.is_enabled(FastFieldCodecName::LinearInterpol) {
codec_estimation::<LinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if codec_enable_checker.is_enabled(FastFieldCodecName::BlockwiseLinearInterpol) {
codec_estimation::<MultiLinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan())
{
warn!(
@@ -122,15 +243,16 @@ impl CompositeFastFieldSerializer {
"choosing fast field codec {} for field_id {:?}",
name, field
); // todo print actual field name
id.serialize(field_write)?;
Self::write_header(field_write, id)?;
match name {
BitpackedFastFieldSerializer::NAME => {
BitpackedFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
iter1,
iter2,
)?;
}
LinearInterpolFastFieldSerializer::NAME => {
@@ -138,8 +260,8 @@ impl CompositeFastFieldSerializer {
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
iter1,
iter2,
)?;
}
MultiLinearInterpolFastFieldSerializer::NAME => {
@@ -147,19 +269,29 @@ impl CompositeFastFieldSerializer {
field_write,
&fastfield_accessor,
stats,
data_iter_1,
data_iter_2,
iter1,
iter2,
)?;
}
_ => {
panic!("unknown fastfield serializer {}", name)
}
};
}
field_write.flush()?;
Ok(())
}
/// Start serializing a new u64 fast field
pub fn serialize_into(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,

View File

@@ -370,23 +370,25 @@ impl IntFastFieldWriter {
};
if let Some(doc_id_map) = doc_id_map {
let iter = doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize));
let iter_gen = || {
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize))
};
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter.clone(),
iter,
iter_gen,
)?;
} else {
let iter_gen = || self.vals.iter();
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
self.vals.iter(),
self.vals.iter(),
iter_gen,
)?;
};
Ok(())

View File

@@ -385,20 +385,17 @@ impl IndexMerger {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
});
let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
});
let iter_gen = || {
doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
})
};
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter1,
iter2,
iter_gen,
)?;
Ok(())
@@ -560,12 +557,12 @@ impl IndexMerger {
}
offsets.push(offset);
let iter_gen = || offsets.iter().cloned();
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
&offsets[..],
offsets.iter().cloned(),
offsets.iter().cloned(),
iter_gen,
)?;
Ok(offsets)
}
@@ -768,24 +765,19 @@ impl IndexMerger {
fast_field_readers: &ff_readers,
offsets,
};
let iter1 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
});
let iter2 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
});
let iter_gen = || {
doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
})
};
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
iter1,
iter2,
iter_gen,
1,
)?;