Compare commits

...

5 Commits

Author SHA1 Message Date
Paul Masurel
9aefa349ca blop 2022-08-22 11:04:39 +02:00
Paul Masurel
b9a87d6dc6 Refactor Further 2022-08-21 13:04:12 +02:00
Paul Masurel
0ec2ebd791 Experimental refactor 2022-08-21 11:40:08 +02:00
Paul Masurel
6602786db8 Moved GCD fast field codec. Partial. 2022-08-20 20:08:07 +02:00
Paul Masurel
c71169b6e0 Fastfield refactoring. 2022-08-20 19:16:31 +02:00
31 changed files with 1055 additions and 1263 deletions

View File

@@ -60,7 +60,6 @@ pretty_assertions = "1.2.1"
serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53"
arc-swap = "1.5.0"
gcd = "2.1.0"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -14,6 +14,7 @@ pub struct BlockedBitpacker {
buffer: Vec<u64>,
offset_and_bits: Vec<BlockedBitpackerEntryMetaData>,
}
impl Default for BlockedBitpacker {
fn default() -> Self {
BlockedBitpacker::new()
@@ -60,12 +61,11 @@ fn metadata_test() {
impl BlockedBitpacker {
pub fn new() -> Self {
let mut compressed_blocks = vec![];
compressed_blocks.resize(8, 0);
let compressed_blocks = vec![0u8; 8];
Self {
compressed_blocks,
buffer: vec![],
offset_and_bits: vec![],
buffer: Vec::new(),
offset_and_bits: Vec::new(),
}
}

View File

@@ -10,7 +10,7 @@
// ---
// Importing tantivy...
use tantivy::collector::{Collector, SegmentCollector};
use tantivy::fastfield::{DynamicFastFieldReader, FastFieldReader};
use tantivy::fastfield::{FastFieldReader, FastFieldReaderImpl};
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, INDEXED, TEXT};
use tantivy::{doc, Index, Score, SegmentReader};
@@ -95,7 +95,7 @@ impl Collector for StatsCollector {
}
struct StatsSegmentCollector {
fast_field_reader: DynamicFastFieldReader<u64>,
fast_field_reader: FastFieldReaderImpl<u64>,
stats: Stats,
}

View File

@@ -11,8 +11,10 @@ description = "Fast field codecs used by tantivy"
[dependencies]
common = { version = "0.3", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
ownedbytes = { version = "0.3.0", path = "../ownedbytes" }
prettytable-rs = {version="0.9.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
[dev-dependencies]
more-asserts = "0.3.0"

View File

@@ -4,12 +4,10 @@ extern crate test;
#[cfg(test)]
mod tests {
use fastfield_codecs::bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer};
use fastfield_codecs::linearinterpol::{
LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer,
};
use fastfield_codecs::bitpacked::{BitpackedFastFieldCodec, BitpackedFastFieldReader};
use fastfield_codecs::linearinterpol::{LinearInterpolCodec, LinearInterpolFastFieldReader};
use fastfield_codecs::multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
MultiLinearInterpolFastFieldCodec, MultiLinearInterpolFastFieldReader,
};
use fastfield_codecs::*;
@@ -29,10 +27,7 @@ mod tests {
fn value_iter() -> impl Iterator<Item = u64> {
0..20_000
}
fn bench_get<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
b: &mut Bencher,
data: &[u64],
) {
fn bench_get<S: FastFieldCodec, R: FastFieldCodecReader>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
S::serialize(
&mut bytes,
@@ -49,7 +44,7 @@ mod tests {
}
});
}
fn bench_create<S: FastFieldCodecSerializer>(b: &mut Bencher, data: &[u64]) {
fn bench_create<S: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
b.iter(|| {
S::serialize(
@@ -67,32 +62,32 @@ mod tests {
#[bench]
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<BitpackedFastFieldSerializer>(b, &data);
bench_create::<BitpackedFastFieldCodec>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<LinearInterpolFastFieldSerializer>(b, &data);
bench_create::<LinearInterpolCodec>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_create(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_create::<MultiLinearInterpolFastFieldSerializer>(b, &data);
bench_create::<MultiLinearInterpolFastFieldCodec>(b, &data);
}
#[bench]
fn bench_fastfield_bitpack_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(b, &data);
bench_get::<BitpackedFastFieldCodec, BitpackedFastFieldReader>(b, &data);
}
#[bench]
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>(b, &data);
bench_get::<LinearInterpolCodec, LinearInterpolFastFieldReader>(b, &data);
}
#[bench]
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
let data: Vec<_> = get_data();
bench_get::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>(
bench_get::<MultiLinearInterpolFastFieldCodec, MultiLinearInterpolFastFieldReader>(
b, &data,
);
}

View File

@@ -1,37 +1,25 @@
use std::io::{self, Write};
use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
use crate::{FastFieldCodec, FastFieldCodecReader, FastFieldStats};
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct BitpackedFastFieldReader {
data: OwnedBytes,
bit_unpacker: BitUnpacker,
pub min_value_u64: u64,
pub max_value_u64: u64,
}
impl FastFieldCodecReader for BitpackedFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let (_data, mut footer) = bytes.split_at(bytes.len() - 16);
let min_value = u64::deserialize(&mut footer)?;
let amplitude = u64::deserialize(&mut footer)?;
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedFastFieldReader {
min_value_u64: min_value,
max_value_u64: max_value,
bit_unpacker,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
self.min_value_u64 + self.bit_unpacker.get(doc, data)
fn get_u64(&self, doc: u64) -> u64 {
self.min_value_u64 + self.bit_unpacker.get(doc, &self.data)
}
#[inline]
fn min_value(&self) -> u64 {
@@ -92,11 +80,30 @@ impl<'a, W: Write> BitpackedFastFieldSerializerLegacy<'a, W> {
}
}
pub struct BitpackedFastFieldSerializer {}
pub struct BitpackedFastFieldCodec;
impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
impl FastFieldCodec for BitpackedFastFieldCodec {
const NAME: &'static str = "Bitpacked";
const ID: u8 = 1;
type Reader = BitpackedFastFieldReader;
/// Opens a fast field given a file.
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
let footer_offset = bytes.len() - 16;
let (data, mut footer) = bytes.split(footer_offset);
let min_value = u64::deserialize(&mut footer)?;
let amplitude = u64::deserialize(&mut footer)?;
let max_value = min_value + amplitude;
let num_bits = compute_num_bits(amplitude);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(BitpackedFastFieldReader {
data,
min_value_u64: min_value,
max_value_u64: max_value,
bit_unpacker,
})
}
/// Serializes data with the BitpackedFastFieldSerializer.
///
/// The serializer in fact encode the values by bitpacking
@@ -106,29 +113,25 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
/// compute the minimum number of bits required to encode
/// values.
fn serialize(
write: &mut impl Write,
_fastfield_accessor: &dyn FastFieldDataAccess,
&self,
write: &mut impl io::Write,
vals: &[u64],
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut serializer =
BitpackedFastFieldSerializerLegacy::open(write, stats.min_value, stats.max_value)?;
for val in data_iter {
for &val in vals {
serializer.add_val(val)?;
}
serializer.close_field()?;
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
_stats: FastFieldStats,
) -> bool {
fn is_applicable(_vals: &[u64], _stats: FastFieldStats) -> bool {
true
}
fn estimate(_fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
fn estimate(_vals: &[u64], stats: FastFieldStats) -> f32 {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
@@ -142,9 +145,7 @@ mod tests {
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) {
crate::tests::create_and_validate::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>(
data, name,
);
crate::tests::create_and_validate(&BitpackedFastFieldCodec, data, name);
}
#[test]

View File

@@ -0,0 +1,254 @@
// Copyright (C) 2022 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::io;
use std::num::NonZeroU64;
use std::sync::Arc;
use common::BinarySerializable;
use fastdivide::DividerU64;
use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedFastFieldCodec;
use crate::gcd::{find_gcd, GCDFastFieldCodecReader, GCDParams};
use crate::linearinterpol::LinearInterpolCodec;
use crate::multilinearinterpol::MultiLinearInterpolFastFieldCodec;
use crate::{FastFieldCodec, FastFieldCodecReader, FastFieldStats};
pub struct DynamicFastFieldCodec;
impl FastFieldCodec for DynamicFastFieldCodec {
const NAME: &'static str = "dynamic";
type Reader = DynamicFastFieldReader;
fn is_applicable(_vals: &[u64], _stats: crate::FastFieldStats) -> bool {
true
}
fn estimate(_vals: &[u64], _stats: crate::FastFieldStats) -> f32 {
0f32
}
fn serialize(
&self,
wrt: &mut impl io::Write,
vals: &[u64],
stats: crate::FastFieldStats,
) -> io::Result<()> {
let gcd: NonZeroU64 = find_gcd(vals.iter().copied().map(|val| val - stats.min_value))
.unwrap_or(unsafe { NonZeroU64::new_unchecked(1) });
if gcd.get() > 1 {
let gcd_divider = DividerU64::divide_by(gcd.get());
let scaled_vals: Vec<u64> = vals
.iter()
.copied()
.map(|val| gcd_divider.divide(val - stats.min_value))
.collect();
<CodecType as BinarySerializable>::serialize(&CodecType::Gcd, wrt)?;
let gcd_params = GCDParams {
min_value: stats.min_value,
gcd,
};
gcd_params.serialize(wrt)?;
let codec_type = choose_codec(stats, &scaled_vals);
<CodecType as BinarySerializable>::serialize(&codec_type, wrt)?;
let scaled_stats = FastFieldStats::compute(&scaled_vals);
codec_type.serialize(wrt, &scaled_vals, scaled_stats)?;
} else {
let codec_type = choose_codec(stats, vals);
wrt.write_all(&[codec_type.to_code()])?;
codec_type.serialize(wrt, vals, stats)?;
}
Ok(())
}
fn open_from_bytes(mut bytes: OwnedBytes) -> io::Result<Self::Reader> {
let codec_code = bytes.read_u8();
let codec_type = CodecType::from_code(codec_code).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Unknown codec code `{codec_code}`"),
)
})?;
let fast_field_reader: Arc<dyn FastFieldCodecReader> = match codec_type {
CodecType::Bitpacked => Arc::new(BitpackedFastFieldCodec::open_from_bytes(bytes)?),
CodecType::LinearInterpol => Arc::new(LinearInterpolCodec::open_from_bytes(bytes)?),
CodecType::MultiLinearInterpol => {
Arc::new(MultiLinearInterpolFastFieldCodec::open_from_bytes(bytes)?)
}
CodecType::Gcd => {
let gcd_params = GCDParams::deserialize(&mut bytes)?;
let inner_codec_type = <CodecType as BinarySerializable>::deserialize(&mut bytes)?;
match inner_codec_type {
CodecType::Bitpacked => Arc::new(GCDFastFieldCodecReader {
params: gcd_params,
reader: BitpackedFastFieldCodec::open_from_bytes(bytes)?,
}),
CodecType::LinearInterpol => Arc::new(GCDFastFieldCodecReader {
params: gcd_params,
reader: LinearInterpolCodec::open_from_bytes(bytes)?,
}),
CodecType::MultiLinearInterpol => Arc::new(GCDFastFieldCodecReader {
params: gcd_params,
reader: MultiLinearInterpolFastFieldCodec::open_from_bytes(bytes)?,
}),
CodecType::Gcd => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"A GCD codec may not wrap another GCD codec.",
));
}
}
}
};
Ok(DynamicFastFieldReader(fast_field_reader))
}
}
#[derive(Clone)]
/// DynamicFastFieldReader wraps different readers to access
/// the various encoded fastfield data
pub struct DynamicFastFieldReader(Arc<dyn FastFieldCodecReader>);
#[repr(u8)]
#[derive(Debug, Clone, Copy)]
pub enum CodecType {
Bitpacked = 0,
LinearInterpol = 1,
MultiLinearInterpol = 2,
Gcd = 3,
}
impl BinarySerializable for CodecType {
fn serialize<W: io::Write>(&self, wrt: &mut W) -> io::Result<()> {
wrt.write_all(&[self.to_code()])?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let codec_code = u8::deserialize(reader)?;
let codec_type = CodecType::from_code(codec_code).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid codec type code {codec_code}"),
)
})?;
Ok(codec_type)
}
}
impl CodecType {
pub fn from_code(code: u8) -> Option<Self> {
match code {
0 => Some(CodecType::Bitpacked),
1 => Some(CodecType::LinearInterpol),
2 => Some(CodecType::MultiLinearInterpol),
3 => Some(CodecType::Gcd),
_ => None,
}
}
pub fn to_code(self) -> u8 {
self as u8
}
fn codec_estimation(
&self,
stats: FastFieldStats,
vals: &[u64],
estimations: &mut Vec<(f32, CodecType)>,
) {
let estimate_opt: Option<f32> = match self {
CodecType::Bitpacked => codec_estimation::<BitpackedFastFieldCodec>(stats, vals),
CodecType::LinearInterpol => codec_estimation::<LinearInterpolCodec>(stats, vals),
CodecType::MultiLinearInterpol => {
codec_estimation::<MultiLinearInterpolFastFieldCodec>(stats, vals)
}
CodecType::Gcd => None,
};
if let Some(estimate) = estimate_opt {
if !estimate.is_nan() && estimate.is_finite() {
estimations.push((estimate, *self));
}
}
}
fn serialize(
&self,
wrt: &mut impl io::Write,
fastfield_accessor: &[u64],
stats: FastFieldStats,
) -> io::Result<()> {
match self {
CodecType::Bitpacked => {
BitpackedFastFieldCodec.serialize(wrt, fastfield_accessor, stats)?;
}
CodecType::LinearInterpol => {
LinearInterpolCodec.serialize(wrt, fastfield_accessor, stats)?;
}
CodecType::MultiLinearInterpol => {
MultiLinearInterpolFastFieldCodec.serialize(wrt, fastfield_accessor, stats)?;
}
CodecType::Gcd => {
panic!("GCD should never be called that way.");
}
}
Ok(())
}
}
impl FastFieldCodecReader for DynamicFastFieldReader {
fn get_u64(&self, doc: u64) -> u64 {
self.0.get_u64(doc)
}
fn min_value(&self) -> u64 {
self.0.min_value()
}
fn max_value(&self) -> u64 {
self.0.max_value()
}
}
fn codec_estimation<T: FastFieldCodec>(stats: FastFieldStats, vals: &[u64]) -> Option<f32> {
if !T::is_applicable(vals, stats.clone()) {
return None;
}
let ratio = T::estimate(vals, stats);
Some(ratio)
}
const CODEC_TYPES: [CodecType; 3] = [
CodecType::Bitpacked,
CodecType::LinearInterpol,
CodecType::MultiLinearInterpol,
];
fn choose_codec(stats: FastFieldStats, vals: &[u64]) -> CodecType {
let mut estimations = Vec::new();
for codec_type in &CODEC_TYPES {
codec_type.codec_estimation(stats, vals, &mut estimations);
}
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, codec_type) = estimations[0];
codec_type
}

247
fastfield_codecs/src/gcd.rs Normal file
View File

@@ -0,0 +1,247 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use common::BinarySerializable;
use fastdivide::DividerU64;
use crate::FastFieldCodecReader;
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct GCDFastFieldCodecReader<CodecReader> {
pub params: GCDParams,
pub reader: CodecReader,
}
impl<C: FastFieldCodecReader> FastFieldCodecReader for GCDFastFieldCodecReader<C> {
#[inline]
fn get_u64(&self, doc: u64) -> u64 {
self.params.min_value + self.params.gcd.get() * self.reader.get_u64(doc)
}
fn min_value(&self) -> u64 {
self.params.min_value + self.params.gcd.get() * self.reader.min_value()
}
fn max_value(&self) -> u64 {
self.params.min_value + self.params.gcd.get() * self.reader.max_value()
}
}
#[derive(Debug, Copy, Clone)]
pub struct GCDParams {
pub min_value: u64,
pub gcd: NonZeroU64,
}
impl BinarySerializable for GCDParams {
fn serialize<W: Write>(&self, wrt: &mut W) -> io::Result<()> {
self.gcd.get().serialize(wrt)?;
self.min_value.serialize(wrt)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let gcd = NonZeroU64::new(u64::deserialize(reader)?)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD=0 is invalid."))?;
let min_value = u64::deserialize(reader)?;
Ok(GCDParams { min_value, gcd })
}
}
fn compute_gcd(mut left: u64, mut right: u64) -> u64 {
while right != 0 {
(left, right) = (right, left % right);
}
left
}
// Find GCD for iterator of numbers
//
// If all numbers are '0' (or if there are not numbers, return None).
pub fn find_gcd(numbers: impl Iterator<Item = u64>) -> Option<NonZeroU64> {
let mut numbers = numbers.filter(|n| *n != 0);
let mut gcd = numbers.next()?;
if gcd == 1 {
return NonZeroU64::new(gcd);
}
let mut gcd_divider = DividerU64::divide_by(gcd);
for val in numbers {
let remainder = val - gcd_divider.divide(val) * gcd;
if remainder == 0 {
continue;
}
gcd = compute_gcd(gcd, val);
if gcd == 1 {
return NonZeroU64::new(1);
}
gcd_divider = DividerU64::divide_by(gcd);
}
NonZeroU64::new(gcd)
}
#[cfg(test)]
mod tests {
// TODO Move test
//
// use std::collections::HashMap;
// use std::path::Path;
//
// use crate::directory::{CompositeFile, RamDirectory, WritePtr};
// use crate::fastfield::serializer::FastFieldCodecEnableCheck;
// use crate::fastfield::tests::{FIELD, FIELDI64, SCHEMA, SCHEMAI64};
// use super::{
// find_gcd, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldCodecName,
// FastFieldReader, FastFieldsWriter, ALL_CODECS,
// };
// use crate::schema::Schema;
// use crate::Directory;
//
// fn get_index(
// docs: &[crate::Document],
// schema: &Schema,
// codec_enable_checker: FastFieldCodecEnableCheck,
// ) -> crate::Result<RamDirectory> {
// let directory: RamDirectory = RamDirectory::create();
// {
// let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
// let mut serializer =
// CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker)
// .unwrap();
// let mut fast_field_writers = FastFieldsWriter::from_schema(schema);
// for doc in docs {
// fast_field_writers.add_document(doc);
// }
// fast_field_writers
// .serialize(&mut serializer, &HashMap::new(), None)
// .unwrap();
// serializer.close().unwrap();
// }
// Ok(directory)
// }
//
// fn test_fastfield_gcd_i64_with_codec(
// codec_name: FastFieldCodecName,
// num_vals: usize,
// ) -> crate::Result<()> {
// let path = Path::new("test");
// let mut docs = vec![];
// for i in 1..=num_vals {
// let val = i as i64 * 1000i64;
// docs.push(doc!(*FIELDI64=>val));
// }
// let directory = get_index(&docs, &SCHEMAI64, codec_name.clone().into())?;
// let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
// let composite_file = CompositeFile::open(&file)?;
// let file = composite_file.open_read(*FIELD).unwrap();
// let fast_field_reader = DynamicFastFieldReader::<i64>::open(file)?;
// assert_eq!(fast_field_reader.get(0), 1000i64);
// assert_eq!(fast_field_reader.get(1), 2000i64);
// assert_eq!(fast_field_reader.get(2), 3000i64);
// assert_eq!(fast_field_reader.max_value(), num_vals as i64 * 1000);
// assert_eq!(fast_field_reader.min_value(), 1000i64);
// let file = directory.open_read(path).unwrap();
//
// Can't apply gcd
// let path = Path::new("test");
// docs.pop();
// docs.push(doc!(*FIELDI64=>2001i64));
// let directory = get_index(&docs, &SCHEMAI64, codec_name.into())?;
// let file2 = directory.open_read(path).unwrap();
// assert!(file2.len() > file.len());
//
// Ok(())
// }
//
// #[test]
// fn test_fastfield_gcd_i64() -> crate::Result<()> {
// for codec_name in ALL_CODECS {
// test_fastfield_gcd_i64_with_codec(codec_name.clone(), 5005)?;
// }
// Ok(())
// }
//
// fn test_fastfield_gcd_u64_with_codec(
// codec_name: FastFieldCodecName,
// num_vals: usize,
// ) -> crate::Result<()> {
// let path = Path::new("test");
// let mut docs = vec![];
// for i in 1..=num_vals {
// let val = i as u64 * 1000u64;
// docs.push(doc!(*FIELD=>val));
// }
// let directory = get_index(&docs, &SCHEMA, codec_name.clone().into())?;
// let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
// let composite_file = CompositeFile::open(&file)?;
// let file = composite_file.open_read(*FIELD).unwrap();
// let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
// assert_eq!(fast_field_reader.get(0), 1000u64);
// assert_eq!(fast_field_reader.get(1), 2000u64);
// assert_eq!(fast_field_reader.get(2), 3000u64);
// assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000);
// assert_eq!(fast_field_reader.min_value(), 1000u64);
// let file = directory.open_read(path).unwrap();
//
// Can't apply gcd
// let path = Path::new("test");
// docs.pop();
// docs.push(doc!(*FIELDI64=>2001u64));
// let directory = get_index(&docs, &SCHEMA, codec_name.into())?;
// let file2 = directory.open_read(path).unwrap();
// assert!(file2.len() > file.len());
//
// Ok(())
// }
//
// #[test]
// fn test_fastfield_gcd_u64() -> crate::Result<()> {
// for codec_name in ALL_CODECS {
// test_fastfield_gcd_u64_with_codec(codec_name.clone(), 5005)?;
// }
// Ok(())
// }
//
// #[test]
// pub fn test_fastfield2() {
// let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
// assert_eq!(test_fastfield.get(0), 100);
// assert_eq!(test_fastfield.get(1), 200);
// assert_eq!(test_fastfield.get(2), 300);
// }
use std::num::NonZeroU64;
use crate::gcd::{compute_gcd, find_gcd};
#[test]
fn test_compute_gcd() {
assert_eq!(compute_gcd(0, 0), 0);
assert_eq!(compute_gcd(4, 0), 4);
assert_eq!(compute_gcd(0, 4), 4);
assert_eq!(compute_gcd(1, 4), 1);
assert_eq!(compute_gcd(4, 1), 1);
assert_eq!(compute_gcd(4, 2), 2);
assert_eq!(compute_gcd(10, 25), 5);
assert_eq!(compute_gcd(25, 10), 5);
assert_eq!(compute_gcd(25, 25), 25);
}
#[test]
fn find_gcd_test() {
assert_eq!(find_gcd([0].into_iter()), None);
assert_eq!(find_gcd([0, 10].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([10, 0].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([].into_iter()), None);
assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([15, 16, 10].into_iter()), NonZeroU64::new(1));
assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([0, 0].into_iter()), None);
}
}

View File

@@ -3,94 +3,95 @@
extern crate more_asserts;
use std::io;
use std::io::Write;
use ownedbytes::OwnedBytes;
pub mod bitpacked;
pub mod dynamic;
pub mod gcd;
pub mod linearinterpol;
pub mod multilinearinterpol;
pub trait FastFieldCodecReader: Sized {
// Unify with FastFieldReader
pub trait FastFieldCodecReader {
/// reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self>;
fn get_u64(&self, doc: u64, data: &[u8]) -> u64;
fn get_u64(&self, doc: u64) -> u64;
fn min_value(&self) -> u64;
fn max_value(&self) -> u64;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldCodecSerializer {
/// A codex needs to provide a unique name and id, which is
/// used for debugging and de/serialization.
pub trait FastFieldCodec {
/// A codex needs to provide a unique name used for debugging.
const NAME: &'static str;
const ID: u8;
type Reader: FastFieldCodecReader;
/// Check if the Codec is able to compress the data
fn is_applicable(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> bool;
fn is_applicable(vals: &[u64], stats: FastFieldStats) -> bool;
/// Returns an estimate of the compression ratio.
/// The baseline is uncompressed 64bit data.
///
/// It could make sense to also return a value representing
/// computational complexity.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32;
fn estimate(vals: &[u64], stats: FastFieldStats) -> f32;
/// Serializes the data using the serializer into write.
/// There are multiple iterators, in case the codec needs to read the data multiple times.
/// The iterators should be preferred over using fastfield_accessor for performance reasons.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
&self,
write: &mut impl io::Write,
vals: &[u64],
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()>;
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader>;
}
/// FastFieldDataAccess is the trait to access fast field data during serialization and estimation.
pub trait FastFieldDataAccess {
/// Return the value associated to the given position.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `position` is greater than the index.
fn get_val(&self, position: u64) -> u64;
}
#[derive(Debug, Clone)]
/// Statistics are used in codec detection and stored in the fast field footer.
#[derive(Clone, Copy, Default, Debug)]
pub struct FastFieldStats {
pub min_value: u64,
pub max_value: u64,
pub num_vals: u64,
}
impl<'a> FastFieldDataAccess for &'a [u64] {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
impl FastFieldStats {
pub fn compute(vals: &[u64]) -> Self {
if vals.is_empty() {
return FastFieldStats::default();
}
let first_val = vals[0];
let mut fast_field_stats = FastFieldStats {
min_value: first_val,
max_value: first_val,
num_vals: 1,
};
for &val in &vals[1..] {
fast_field_stats.record(val);
}
fast_field_stats
}
}
impl FastFieldDataAccess for Vec<u64> {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
pub fn record(&mut self, val: u64) {
self.num_vals += 1;
self.min_value = self.min_value.min(val);
self.max_value = self.max_value.max(val);
}
}
#[cfg(test)]
mod tests {
use crate::bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer};
use crate::linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer};
use crate::multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
};
use crate::bitpacked::BitpackedFastFieldCodec;
use crate::linearinterpol::LinearInterpolCodec;
use crate::multilinearinterpol::MultiLinearInterpolFastFieldCodec;
pub fn create_and_validate<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
pub fn create_and_validate<S: FastFieldCodec>(
codec: &S,
data: &[u64],
name: &str,
) -> (f32, f32) {
@@ -98,19 +99,16 @@ mod tests {
return (f32::MAX, 0.0);
}
let estimation = S::estimate(&data, crate::tests::stats_from_vec(data));
let mut out = vec![];
S::serialize(
&mut out,
&data,
crate::tests::stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
let mut out: Vec<u8> = Vec::new();
codec
.serialize(&mut out, &data, crate::tests::stats_from_vec(data))
.unwrap();
let reader = R::open_from_bytes(&out).unwrap();
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);
let reader = S::open_from_bytes(OwnedBytes::new(out)).unwrap();
for (doc, orig_val) in data.iter().enumerate() {
let val = reader.get_u64(doc as u64, &out);
let val = reader.get_u64(doc as u64);
if val != *orig_val {
panic!(
"val {:?} does not match orig_val {:?}, in data set {}, data {:?}",
@@ -118,7 +116,6 @@ mod tests {
);
}
}
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);
(estimation, actual_compression)
}
pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
@@ -137,11 +134,10 @@ mod tests {
data_and_names
}
fn test_codec<S: FastFieldCodecSerializer, R: FastFieldCodecReader>() {
let codec_name = S::NAME;
fn test_codec<C: FastFieldCodec>(codec: &C) {
let codec_name = C::NAME;
for (data, data_set_name) in get_codec_test_data_sets() {
let (estimate, actual) =
crate::tests::create_and_validate::<S, R>(&data, data_set_name);
let (estimate, actual) = crate::tests::create_and_validate(codec, &data, data_set_name);
let result = if estimate == f32::MAX {
"Disabled".to_string()
} else {
@@ -155,15 +151,15 @@ mod tests {
}
#[test]
fn test_codec_bitpacking() {
test_codec::<BitpackedFastFieldSerializer, BitpackedFastFieldReader>();
test_codec(&BitpackedFastFieldCodec);
}
#[test]
fn test_codec_interpolation() {
test_codec::<LinearInterpolFastFieldSerializer, LinearInterpolFastFieldReader>();
test_codec(&LinearInterpolCodec);
}
#[test]
fn test_codec_multi_interpolation() {
test_codec::<MultiLinearInterpolFastFieldSerializer, MultiLinearInterpolFastFieldReader>();
test_codec(&MultiLinearInterpolFastFieldCodec);
}
use super::*;
@@ -182,16 +178,15 @@ mod tests {
let data = (10..=20000_u64).collect::<Vec<_>>();
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
LinearInterpolCodec::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.01);
let multi_linear_interpol_estimation =
MultiLinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
MultiLinearInterpolFastFieldCodec::estimate(&&data[..], stats_from_vec(&data));
assert_le!(multi_linear_interpol_estimation, 0.2);
assert_le!(linear_interpol_estimation, multi_linear_interpol_estimation);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
let bitpacked_estimation = BitpackedFastFieldCodec::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, bitpacked_estimation);
}
#[test]
@@ -199,11 +194,10 @@ mod tests {
let data = vec![200, 10, 10, 10, 10, 1000, 20];
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
LinearInterpolCodec::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.32);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
let bitpacked_estimation = BitpackedFastFieldCodec::estimate(&data, stats_from_vec(&data));
assert_le!(bitpacked_estimation, linear_interpol_estimation);
}
#[test]
@@ -214,11 +208,10 @@ mod tests {
// in this case the linear interpolation can't in fact not be worse than bitpacking,
// but the estimator adds some threshold, which leads to estimated worse behavior
let linear_interpol_estimation =
LinearInterpolFastFieldSerializer::estimate(&data, stats_from_vec(&data));
LinearInterpolCodec::estimate(&data, stats_from_vec(&data));
assert_le!(linear_interpol_estimation, 0.35);
let bitpacked_estimation =
BitpackedFastFieldSerializer::estimate(&data, stats_from_vec(&data));
let bitpacked_estimation = BitpackedFastFieldCodec::estimate(&data, stats_from_vec(&data));
assert_le!(bitpacked_estimation, 0.32);
assert_le!(bitpacked_estimation, linear_interpol_estimation);
}

View File

@@ -2,14 +2,16 @@ use std::io::{self, Read, Write};
use std::ops::Sub;
use common::{BinarySerializable, FixedSize};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
use crate::{FastFieldCodec, FastFieldCodecReader, FastFieldStats};
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct LinearInterpolFastFieldReader {
data: OwnedBytes,
bit_unpacker: BitUnpacker,
pub footer: LinearInterpolFooter,
pub slope: f32,
@@ -56,24 +58,10 @@ impl FixedSize for LinearInterpolFooter {
}
impl FastFieldCodecReader for LinearInterpolFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let (_data, mut footer) = bytes.split_at(bytes.len() - LinearInterpolFooter::SIZE_IN_BYTES);
let footer = LinearInterpolFooter::deserialize(&mut footer)?;
let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals);
let num_bits = compute_num_bits(footer.relative_max_value);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(LinearInterpolFastFieldReader {
bit_unpacker,
footer,
slope,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
fn get_u64(&self, doc: u64) -> u64 {
let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope);
(calculated_value + self.bit_unpacker.get(doc, data)) - self.footer.offset
(calculated_value + self.bit_unpacker.get(doc, &self.data)) - self.footer.offset
}
#[inline]
@@ -88,7 +76,7 @@ impl FastFieldCodecReader for LinearInterpolFastFieldReader {
/// Fastfield serializer, which tries to guess values by linear interpolation
/// and stores the difference bitpacked.
pub struct LinearInterpolFastFieldSerializer {}
pub struct LinearInterpolCodec;
#[inline]
fn get_slope(first_val: u64, last_val: u64, num_vals: u64) -> f32 {
@@ -105,26 +93,44 @@ fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
first_val + (pos as f32 * slope) as u64
}
impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
impl FastFieldCodec for LinearInterpolCodec {
const NAME: &'static str = "LinearInterpol";
const ID: u8 = 2;
type Reader = LinearInterpolFastFieldReader;
/// Opens a fast field given a file.
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
let footer_offset = bytes.len() - LinearInterpolFooter::SIZE_IN_BYTES;
let (data, mut footer) = bytes.split(footer_offset);
let footer = LinearInterpolFooter::deserialize(&mut footer)?;
let slope = get_slope(footer.first_val, footer.last_val, footer.num_vals);
let num_bits = compute_num_bits(footer.relative_max_value);
let bit_unpacker = BitUnpacker::new(num_bits);
Ok(LinearInterpolFastFieldReader {
data,
bit_unpacker,
footer,
slope,
})
}
/// Creates a new fast field serializer.
fn serialize(
&self,
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
vals: &[u64],
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
let first_val = fastfield_accessor.get_val(0);
let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1);
let first_val = vals[0];
let last_val = vals[vals.len() - 1];
let slope = get_slope(first_val, last_val, stats.num_vals);
// calculate offset to ensure all values are positive
let mut offset = 0;
let mut rel_positive_max = 0;
for (pos, actual_value) in data_iter1.enumerate() {
for (pos, actual_value) in vals.iter().copied().enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
if calculated_value > actual_value {
// negative value we need to apply an offset
@@ -142,7 +148,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
let num_bits = compute_num_bits(relative_max_value);
let mut bit_packer = BitPacker::new();
for (pos, val) in data_iter.enumerate() {
for (pos, val) in vals.iter().copied().enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
let diff = (val + offset) - calculated_value;
bit_packer.write(diff, num_bits, write)?;
@@ -161,10 +167,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
footer.serialize(write)?;
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> bool {
fn is_applicable(_vals: &[u64], stats: FastFieldStats) -> bool {
if stats.num_vals < 3 {
return false; // disable compressor for this case
}
@@ -185,22 +188,22 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima for the deviation of the calculated value are and
/// the offset to shift all values to >=0 is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
let first_val = fastfield_accessor.get_val(0);
let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1);
fn estimate(vals: &[u64], stats: FastFieldStats) -> f32 {
let first_val = vals[0];
let last_val = vals[vals.len() - 1];
let slope = get_slope(first_val, last_val, stats.num_vals);
// let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = stats.num_vals as f32 / 100.0;
let sample_positions = (0..20)
let sample_positions: Vec<usize> = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as usize)
.collect::<Vec<_>>();
let max_distance = sample_positions
.iter()
.into_iter()
.map(|pos| {
let calculated_value = get_calculated_value(first_val, *pos as u64, slope);
let actual_value = fastfield_accessor.get_val(*pos as u64);
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
let actual_value = vals[pos];
distance(calculated_value, actual_value)
})
.max()
@@ -235,10 +238,7 @@ mod tests {
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) -> (f32, f32) {
crate::tests::create_and_validate::<
LinearInterpolFastFieldSerializer,
LinearInterpolFastFieldReader,
>(data, name)
crate::tests::create_and_validate(&LinearInterpolCodec, data, name)
}
#[test]

View File

@@ -1,8 +1,9 @@
#[macro_use]
extern crate prettytable;
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
use fastfield_codecs::{FastFieldCodecSerializer, FastFieldStats};
// use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
// use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
use fastfield_codecs::bitpacked::BitpackedFastFieldCodec;
use fastfield_codecs::{FastFieldCodec, FastFieldStats};
use prettytable::{Cell, Row, Table};
fn main() {
@@ -12,14 +13,12 @@ fn main() {
table.add_row(row!["", "Compression Ratio", "Compression Estimation"]);
for (data, data_set_name) in get_codec_test_data_sets() {
let mut results = vec![];
let res = serialize_with_codec::<LinearInterpolFastFieldSerializer>(&data);
results.push(res);
let res = serialize_with_codec::<MultiLinearInterpolFastFieldSerializer>(&data);
results.push(res);
let res = serialize_with_codec::<fastfield_codecs::bitpacked::BitpackedFastFieldSerializer>(
&data,
);
let mut results = Vec::new();
// let res = serialize_with_codec::<LinearInterpolFastFieldSerializer>(&data);
// results.push(res);
// let res = serialize_with_codec::<MultiLinearInterpolFastFieldSerializer>(&data);
// results.push(res);
let res = serialize_with_codec(&BitpackedFastFieldCodec, &data);
results.push(res);
// let best_estimation_codec = results
@@ -91,7 +90,8 @@ pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
data_and_names
}
pub fn serialize_with_codec<S: FastFieldCodecSerializer>(
pub fn serialize_with_codec<S: FastFieldCodec>(
codec: &S,
data: &[u64],
) -> (bool, f32, f32, &'static str) {
let is_applicable = S::is_applicable(&data, stats_from_vec(data));
@@ -100,14 +100,9 @@ pub fn serialize_with_codec<S: FastFieldCodecSerializer>(
}
let estimation = S::estimate(&data, stats_from_vec(data));
let mut out = vec![];
S::serialize(
&mut out,
&data,
stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
codec
.serialize(&mut out, &data, stats_from_vec(data))
.unwrap();
let actual_compression = out.len() as f32 / (data.len() * 8) as f32;
(true, estimation, actual_compression, S::NAME)

View File

@@ -14,16 +14,18 @@ use std::io::{self, Read, Write};
use std::ops::Sub;
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
use crate::{FastFieldCodec, FastFieldCodecReader, FastFieldStats};
const CHUNK_SIZE: u64 = 512;
const CHUNK_SIZE: usize = 512;
/// Depending on the field type, a different
/// fast field is required.
#[derive(Clone)]
pub struct MultiLinearInterpolFastFieldReader {
data: OwnedBytes,
pub footer: MultiLinearInterpolFooter,
}
@@ -126,43 +128,27 @@ impl BinarySerializable for MultiLinearInterpolFooter {
interpolations: Vec::<Function>::deserialize(reader)?,
};
for (num, interpol) in footer.interpolations.iter_mut().enumerate() {
interpol.start_pos = CHUNK_SIZE * num as u64;
interpol.start_pos = (CHUNK_SIZE * num) as u64;
}
Ok(footer)
}
}
#[inline]
fn get_interpolation_position(doc: u64) -> usize {
let index = doc / CHUNK_SIZE;
index as usize
}
#[inline]
fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Function {
&interpolations[get_interpolation_position(doc)]
&interpolations[doc as usize / CHUNK_SIZE]
}
impl FastFieldCodecReader for MultiLinearInterpolFastFieldReader {
/// Opens a fast field given a file.
fn open_from_bytes(bytes: &[u8]) -> io::Result<Self> {
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let (_data, mut footer) = bytes.split_at(bytes.len() - (4 + footer_len) as usize);
let footer = MultiLinearInterpolFooter::deserialize(&mut footer)?;
Ok(MultiLinearInterpolFastFieldReader { footer })
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
fn get_u64(&self, doc: u64) -> u64 {
let interpolation = get_interpolation_function(doc, &self.footer.interpolations);
let doc = doc - interpolation.start_pos;
let calculated_value =
get_calculated_value(interpolation.value_start_pos, doc, interpolation.slope);
let diff = interpolation
.bit_unpacker
.get(doc, &data[interpolation.data_start_offset as usize..]);
.get(doc, &self.data[interpolation.data_start_offset as usize..]);
(calculated_value + diff) - interpolation.positive_val_offset
}
@@ -187,23 +173,33 @@ fn get_calculated_value(first_val: u64, pos: u64, slope: f32) -> u64 {
}
/// Same as LinearInterpolFastFieldSerializer, but working on chunks of CHUNK_SIZE elements.
pub struct MultiLinearInterpolFastFieldSerializer {}
pub struct MultiLinearInterpolFastFieldCodec;
impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
impl FastFieldCodec for MultiLinearInterpolFastFieldCodec {
const NAME: &'static str = "MultiLinearInterpol";
const ID: u8 = 3;
type Reader = MultiLinearInterpolFastFieldReader;
/// Opens a fast field given a file.
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader> {
let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?;
let footer_offset = bytes.len() - 4 - footer_len as usize;
let (data, mut footer) = bytes.split(footer_offset);
let footer = MultiLinearInterpolFooter::deserialize(&mut footer)?;
Ok(MultiLinearInterpolFastFieldReader { data, footer })
}
/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
&self,
write: &mut impl io::Write,
vals: &[u64],
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
let first_val = fastfield_accessor.get_val(0);
let last_val = fastfield_accessor.get_val(stats.num_vals as u64 - 1);
let first_val = vals[0];
let last_val = vals[vals.len() - 1];
let mut first_function = Function {
end_pos: stats.num_vals,
@@ -214,16 +210,11 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
first_function.calc_slope();
let mut interpolations = vec![first_function];
// Since we potentially apply multiple passes over the data, the data is cached.
// Multiple iteration can be expensive (merge with index sorting can add lot of overhead per
// iteration)
let data = data_iter.collect::<Vec<_>>();
//// let's split this into chunks of CHUNK_SIZE
for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) {
for vals_pos in (0..vals.len()).step_by(CHUNK_SIZE).skip(1) {
let new_fun = {
let current_interpolation = interpolations.last_mut().unwrap();
current_interpolation.split(data_pos, data[data_pos as usize])
current_interpolation.split(vals_pos as u64, vals[vals_pos])
};
interpolations.push(new_fun);
}
@@ -231,7 +222,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
for interpolation in &mut interpolations {
let mut offset = 0;
let mut rel_positive_max = 0;
for (pos, actual_value) in data
for (pos, actual_value) in vals
[interpolation.start_pos as usize..interpolation.end_pos as usize]
.iter()
.cloned()
@@ -262,7 +253,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
for interpolation in &mut interpolations {
interpolation.data_start_offset = write.written_bytes();
let num_bits = interpolation.num_bits;
for (pos, actual_value) in data
for (pos, actual_value) in vals
[interpolation.start_pos as usize..interpolation.end_pos as usize]
.iter()
.cloned()
@@ -290,10 +281,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
Ok(())
}
fn is_applicable(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> bool {
fn is_applicable(_vals: &[u64], stats: FastFieldStats) -> bool {
if stats.num_vals < 5_000 {
return false;
}
@@ -314,11 +302,11 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima are for the deviation of the calculated value and
/// the offset is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32 {
let first_val_in_first_block = fastfield_accessor.get_val(0);
let last_elem_in_first_chunk = CHUNK_SIZE.min(stats.num_vals);
let last_val_in_first_block =
fastfield_accessor.get_val(last_elem_in_first_chunk as u64 - 1);
fn estimate(vals: &[u64], stats: FastFieldStats) -> f32 {
// TODO simplify now that we have a vals array.
let first_val_in_first_block = vals[0];
let last_elem_in_first_chunk = CHUNK_SIZE.min(vals.len());
let last_val_in_first_block = vals[last_elem_in_first_chunk - 1];
let slope = get_slope(
first_val_in_first_block,
last_val_in_first_block,
@@ -332,10 +320,11 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
let max_distance = sample_positions
.iter()
.copied()
.map(|pos| {
let calculated_value =
get_calculated_value(first_val_in_first_block, *pos as u64, slope);
let actual_value = fastfield_accessor.get_val(*pos as u64);
get_calculated_value(first_val_in_first_block, pos as u64, slope);
let actual_value = vals[pos];
distance(calculated_value, actual_value)
})
.max()
@@ -351,7 +340,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
let num_bits = compute_num_bits(relative_max_value as u64) as u64 * stats.num_vals as u64
// function metadata per block
+ 29 * (stats.num_vals / CHUNK_SIZE);
+ 29 * (stats.num_vals / CHUNK_SIZE as u64);
let num_bits_uncompressed = 64 * stats.num_vals;
num_bits as f32 / num_bits_uncompressed as f32
}
@@ -371,10 +360,7 @@ mod tests {
use crate::tests::get_codec_test_data_sets;
fn create_and_validate(data: &[u64], name: &str) -> (f32, f32) {
crate::tests::create_and_validate::<
MultiLinearInterpolFastFieldSerializer,
MultiLinearInterpolFastFieldReader,
>(data, name)
crate::tests::create_and_validate(&MultiLinearInterpolFastFieldCodec, data, name)
}
#[test]

View File

@@ -10,7 +10,7 @@ use super::metric::{AverageAggregation, StatsAggregation};
use super::segment_agg_result::BucketCount;
use super::VecWithNames;
use crate::fastfield::{
type_and_cardinality, DynamicFastFieldReader, FastType, MultiValuedFastFieldReader,
type_and_cardinality, FastFieldReaderImpl, FastType, MultiValuedFastFieldReader,
};
use crate::schema::{Cardinality, Type};
use crate::{InvertedIndexReader, SegmentReader, TantivyError};
@@ -37,10 +37,10 @@ impl AggregationsWithAccessor {
#[derive(Clone)]
pub(crate) enum FastFieldAccessor {
Multi(MultiValuedFastFieldReader<u64>),
Single(DynamicFastFieldReader<u64>),
Single(FastFieldReaderImpl<u64>),
}
impl FastFieldAccessor {
pub fn as_single(&self) -> Option<&DynamicFastFieldReader<u64>> {
pub fn as_single(&self) -> Option<&FastFieldReaderImpl<u64>> {
match self {
FastFieldAccessor::Multi(_) => None,
FastFieldAccessor::Single(reader) => Some(reader),
@@ -118,7 +118,7 @@ impl BucketAggregationWithAccessor {
pub struct MetricAggregationWithAccessor {
pub metric: MetricAggregation,
pub field_type: Type,
pub accessor: DynamicFastFieldReader<u64>,
pub accessor: FastFieldReaderImpl<u64>,
}
impl MetricAggregationWithAccessor {

View File

@@ -14,7 +14,7 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl};
use crate::schema::Type;
use crate::{DocId, TantivyError};
@@ -263,7 +263,7 @@ impl SegmentHistogramCollector {
req: &HistogramAggregation,
sub_aggregation: &AggregationsWithAccessor,
field_type: Type,
accessor: &DynamicFastFieldReader<u64>,
accessor: &FastFieldReaderImpl<u64>,
) -> crate::Result<Self> {
req.validate()?;
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);

View File

@@ -3,7 +3,7 @@ use std::fmt::Debug;
use serde::{Deserialize, Serialize};
use crate::aggregation::f64_from_fastfield_u64;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl};
use crate::schema::Type;
use crate::DocId;
@@ -43,7 +43,7 @@ pub(crate) struct SegmentAverageCollector {
}
impl Debug for SegmentAverageCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("AverageCollector")
.field("data", &self.data)
.finish()
@@ -57,7 +57,7 @@ impl SegmentAverageCollector {
data: Default::default(),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &FastFieldReaderImpl<u64>) {
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val1 = field.get(docs[0]);

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use crate::aggregation::f64_from_fastfield_u64;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl};
use crate::schema::Type;
use crate::{DocId, TantivyError};
@@ -163,7 +163,7 @@ impl SegmentStatsCollector {
stats: IntermediateStats::default(),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &FastFieldReaderImpl<u64>) {
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val1 = field.get(docs[0]);

View File

@@ -12,7 +12,7 @@
use std::marker::PhantomData;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, FastValue};
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
@@ -158,7 +158,7 @@ where
TPredicate: 'static,
TPredicateValue: FastValue,
{
fast_field_reader: DynamicFastFieldReader<TPredicateValue>,
fast_field_reader: FastFieldReaderImpl<TPredicateValue>,
segment_collector: TSegmentCollector,
predicate: TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,

View File

@@ -1,7 +1,7 @@
use fastdivide::DividerU64;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, FastValue};
use crate::schema::{Field, Type};
use crate::{DocId, Score};
@@ -84,7 +84,7 @@ impl HistogramComputer {
}
pub struct SegmentHistogramCollector {
histogram_computer: HistogramComputer,
ff_reader: DynamicFastFieldReader<u64>,
ff_reader: FastFieldReaderImpl<u64>,
}
impl SegmentCollector for SegmentHistogramCollector {

View File

@@ -1,7 +1,7 @@
use super::*;
use crate::collector::{Count, FilterCollector, TopDocs};
use crate::core::SegmentReader;
use crate::fastfield::{BytesFastFieldReader, DynamicFastFieldReader, FastFieldReader};
use crate::fastfield::{BytesFastFieldReader, FastFieldReader, FastFieldReaderImpl};
use crate::query::{AllQuery, QueryParser};
use crate::schema::{Field, Schema, FAST, TEXT};
use crate::time::format_description::well_known::Rfc3339;
@@ -156,7 +156,7 @@ pub struct FastFieldTestCollector {
pub struct FastFieldSegmentCollector {
vals: Vec<u64>,
reader: DynamicFastFieldReader<u64>,
reader: FastFieldReaderImpl<u64>,
}
impl FastFieldTestCollector {

View File

@@ -9,7 +9,7 @@ use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
use crate::collector::{
CustomScorer, CustomSegmentScorer, ScoreSegmentTweaker, ScoreTweaker, SegmentCollector,
};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, FastValue};
use crate::query::Weight;
use crate::schema::Field;
use crate::{DocAddress, DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
@@ -129,7 +129,7 @@ impl fmt::Debug for TopDocs {
}
struct ScorerByFastFieldReader {
ff_reader: DynamicFastFieldReader<u64>,
ff_reader: FastFieldReaderImpl<u64>,
}
impl CustomSegmentScorer<u64> for ScorerByFastFieldReader {

View File

@@ -1,5 +1,5 @@
use crate::directory::{FileSlice, OwnedBytes};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, MultiValueLength};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, MultiValueLength};
use crate::DocId;
/// Reader for byte array fast fields
@@ -14,13 +14,13 @@ use crate::DocId;
/// and the start index for the next document, and keeping the bytes in between.
#[derive(Clone)]
pub struct BytesFastFieldReader {
idx_reader: DynamicFastFieldReader<u64>,
idx_reader: FastFieldReaderImpl<u64>,
values: OwnedBytes,
}
impl BytesFastFieldReader {
pub(crate) fn open(
idx_reader: DynamicFastFieldReader<u64>,
idx_reader: FastFieldReaderImpl<u64>,
values_file: FileSlice,
) -> crate::Result<BytesFastFieldReader> {
let values = values_file.read_bytes()?;

View File

@@ -1,224 +0,0 @@
use std::io::{self, Write};
use common::BinarySerializable;
use fastdivide::DividerU64;
use fastfield_codecs::FastFieldCodecReader;
use gcd::Gcd;
pub const GCD_DEFAULT: u64 = 1;
pub const GCD_CODEC_ID: u8 = 4;
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct GCDFastFieldCodec<CodecReader> {
gcd: u64,
min_value: u64,
reader: CodecReader,
}
impl<C: FastFieldCodecReader + Clone> FastFieldCodecReader for GCDFastFieldCodec<C> {
/// Opens a fast field given the bytes.
fn open_from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
let (header, mut footer) = bytes.split_at(bytes.len() - 16);
let gcd = u64::deserialize(&mut footer)?;
let min_value = u64::deserialize(&mut footer)?;
let reader = C::open_from_bytes(header)?;
Ok(GCDFastFieldCodec {
gcd,
min_value,
reader,
})
}
#[inline]
fn get_u64(&self, doc: u64, data: &[u8]) -> u64 {
let mut data = self.reader.get_u64(doc, data);
data *= self.gcd;
data += self.min_value;
data
}
fn min_value(&self) -> u64 {
self.min_value + self.reader.min_value() * self.gcd
}
fn max_value(&self) -> u64 {
self.min_value + self.reader.max_value() * self.gcd
}
}
pub fn write_gcd_header<W: Write>(field_write: &mut W, min_value: u64, gcd: u64) -> io::Result<()> {
gcd.serialize(field_write)?;
min_value.serialize(field_write)?;
Ok(())
}
// Find GCD for iterator of numbers
pub fn find_gcd(numbers: impl Iterator<Item = u64>) -> Option<u64> {
let mut numbers = numbers.filter(|n| *n != 0);
let mut gcd = numbers.next()?;
if gcd == 1 {
return Some(1);
}
let mut gcd_divider = DividerU64::divide_by(gcd);
for val in numbers {
let remainder = val - (gcd_divider.divide(val)) * gcd;
if remainder == 0 {
continue;
}
gcd = gcd.gcd(val);
if gcd == 1 {
return Some(1);
}
gcd_divider = DividerU64::divide_by(gcd);
}
Some(gcd)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::Path;
use common::HasLen;
use crate::directory::{CompositeFile, RamDirectory, WritePtr};
use crate::fastfield::serializer::FastFieldCodecEnableCheck;
use crate::fastfield::tests::{FIELD, FIELDI64, SCHEMA, SCHEMAI64};
use crate::fastfield::{
find_gcd, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldCodecName,
FastFieldReader, FastFieldsWriter, ALL_CODECS,
};
use crate::schema::Schema;
use crate::Directory;
fn get_index(
docs: &[crate::Document],
schema: &Schema,
codec_enable_checker: FastFieldCodecEnableCheck,
) -> crate::Result<RamDirectory> {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer =
CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker)
.unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(schema);
for doc in docs {
fast_field_writers.add_document(doc);
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
Ok(directory)
}
fn test_fastfield_gcd_i64_with_codec(
codec_name: FastFieldCodecName,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = i as i64 * 1000i64;
docs.push(doc!(*FIELDI64=>val));
}
let directory = get_index(&docs, &SCHEMAI64, codec_name.clone().into())?;
let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<i64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 1000i64);
assert_eq!(fast_field_reader.get(1), 2000i64);
assert_eq!(fast_field_reader.get(2), 3000i64);
assert_eq!(fast_field_reader.max_value(), num_vals as i64 * 1000);
assert_eq!(fast_field_reader.min_value(), 1000i64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001i64));
let directory = get_index(&docs, &SCHEMAI64, codec_name.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_i64() -> crate::Result<()> {
for codec_name in ALL_CODECS {
test_fastfield_gcd_i64_with_codec(codec_name.clone(), 5005)?;
}
Ok(())
}
fn test_fastfield_gcd_u64_with_codec(
codec_name: FastFieldCodecName,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = i as u64 * 1000u64;
docs.push(doc!(*FIELD=>val));
}
let directory = get_index(&docs, &SCHEMA, codec_name.clone().into())?;
let file = directory.open_read(path).unwrap();
// assert_eq!(file.len(), 118);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 1000u64);
assert_eq!(fast_field_reader.get(1), 2000u64);
assert_eq!(fast_field_reader.get(2), 3000u64);
assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000);
assert_eq!(fast_field_reader.min_value(), 1000u64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001u64));
let directory = get_index(&docs, &SCHEMA, codec_name.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_u64() -> crate::Result<()> {
for codec_name in ALL_CODECS {
test_fastfield_gcd_u64_with_codec(codec_name.clone(), 5005)?;
}
Ok(())
}
#[test]
pub fn test_fastfield2() {
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
assert_eq!(test_fastfield.get(0), 100);
assert_eq!(test_fastfield.get(1), 200);
assert_eq!(test_fastfield.get(2), 300);
}
#[test]
fn find_gcd_test() {
assert_eq!(find_gcd([0].into_iter()), None);
assert_eq!(find_gcd([0, 10].into_iter()), Some(10));
assert_eq!(find_gcd([10, 0].into_iter()), Some(10));
assert_eq!(find_gcd([].into_iter()), None);
assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), Some(5));
assert_eq!(find_gcd([15, 16, 10].into_iter()), Some(1));
assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), Some(5));
}
}

View File

@@ -20,16 +20,18 @@
//!
//! Read access performance is comparable to that of an array lookup.
use fastfield_codecs::dynamic::DynamicFastFieldCodec;
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::reader::FastFieldReader;
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldStats};
pub use self::wrapper::FastFieldReaderWrapper;
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::schema::{Cardinality, FieldType, Type, Value};
use crate::{DateTime, DocId};
@@ -38,25 +40,13 @@ mod alive_bitset;
mod bytes;
mod error;
mod facet_reader;
mod gcd;
mod multivalued;
mod reader;
mod readers;
mod serializer;
mod wrapper;
mod writer;
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)]
pub(crate) enum FastFieldCodecName {
Bitpacked,
LinearInterpol,
BlockwiseLinearInterpol,
}
pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[
FastFieldCodecName::Bitpacked,
FastFieldCodecName::LinearInterpol,
FastFieldCodecName::BlockwiseLinearInterpol,
];
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
@@ -126,6 +116,9 @@ impl FastValue for u64 {
}
}
// TODO rename
pub type FastFieldReaderImpl<V> = FastFieldReaderWrapper<V, DynamicFastFieldCodec>;
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
@@ -290,18 +283,11 @@ mod tests {
schema_builder.build()
});
pub static SCHEMAI64: Lazy<Schema> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
schema_builder.add_i64_field("field", FAST);
schema_builder.build()
});
pub static FIELD: Lazy<Field> = Lazy::new(|| SCHEMA.get_field("field").unwrap());
pub static FIELDI64: Lazy<Field> = Lazy::new(|| SCHEMAI64.get_field("field").unwrap());
#[test]
pub fn test_fastfield() {
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
let test_fastfield = FastFieldReaderImpl::<u64>::from(&[100, 200, 300]);
assert_eq!(test_fastfield.get(0), 100);
assert_eq!(test_fastfield.get(1), 200);
assert_eq!(test_fastfield.get(2), 300);
@@ -333,7 +319,7 @@ mod tests {
assert_eq!(file.len(), 37);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
let fast_field_reader = FastFieldReaderImpl::<u64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 13u64);
assert_eq!(fast_field_reader.get(1), 14u64);
assert_eq!(fast_field_reader.get(2), 2u64);
@@ -365,7 +351,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 4u64);
assert_eq!(fast_field_reader.get(1), 14_082_001u64);
assert_eq!(fast_field_reader.get(2), 3_052u64);
@@ -401,7 +387,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<u64>::open(data)?;
for doc in 0..10_000 {
assert_eq!(fast_field_reader.get(doc), 100_000u64);
}
@@ -433,7 +419,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 0u64);
for doc in 1..10_001 {
assert_eq!(
@@ -473,7 +459,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<i64>::open(data)?;
assert_eq!(fast_field_reader.min_value(), -100i64);
assert_eq!(fast_field_reader.max_value(), 9_999i64);
@@ -513,7 +499,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<i64>::open(data)?;
assert_eq!(fast_field_reader.get(0u32), 0i64);
}
Ok(())
@@ -551,7 +537,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let fast_field_reader = FastFieldReaderImpl::<u64>::open(data)?;
for a in 0..n {
assert_eq!(fast_field_reader.get(a as u32), permutation[a as usize]);
@@ -868,7 +854,7 @@ mod tests {
#[test]
pub fn test_fastfield_bool() {
let test_fastfield = DynamicFastFieldReader::<bool>::from(vec![true, false, true, false]);
let test_fastfield = FastFieldReaderImpl::<bool>::from(&[true, false, true, false]);
assert_eq!(test_fastfield.get(0), true);
assert_eq!(test_fastfield.get(1), false);
assert_eq!(test_fastfield.get(2), true);
@@ -902,7 +888,7 @@ mod tests {
assert_eq!(file.len(), 36);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(field).unwrap();
let fast_field_reader = DynamicFastFieldReader::<bool>::open(file)?;
let fast_field_reader = FastFieldReaderImpl::<bool>::open(file)?;
assert_eq!(fast_field_reader.get(0), true);
assert_eq!(fast_field_reader.get(1), false);
assert_eq!(fast_field_reader.get(2), true);
@@ -938,7 +924,7 @@ mod tests {
assert_eq!(file.len(), 48);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(field).unwrap();
let fast_field_reader = DynamicFastFieldReader::<bool>::open(file)?;
let fast_field_reader = FastFieldReaderImpl::<bool>::open(file)?;
for i in 0..25 {
assert_eq!(fast_field_reader.get(i * 2), true);
assert_eq!(fast_field_reader.get(i * 2 + 1), false);
@@ -972,7 +958,7 @@ mod tests {
assert_eq!(file.len(), 35);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(field).unwrap();
let fast_field_reader = DynamicFastFieldReader::<bool>::open(file)?;
let fast_field_reader = FastFieldReaderImpl::<bool>::open(file)?;
assert_eq!(fast_field_reader.get(0), false);
Ok(())

View File

@@ -346,26 +346,32 @@ mod tests {
assert!(test_multivalued_no_panic(&ops[..]).is_ok());
}
}
#[test]
fn test_proptest_merge_multivalued_bug() {
use IndexingOp::*;
let ops = &[AddDoc { id: 7 }, AddDoc { id: 4 }, Merge];
assert!(test_multivalued_no_panic(ops).is_ok());
}
#[test]
fn test_multivalued_proptest_gcd() {
use IndexingOp::*;
let ops = [AddDoc { id: 9 }, AddDoc { id: 9 }, Merge];
assert!(test_multivalued_no_panic(&ops[..]).is_ok());
let ops = &[AddDoc { id: 9 }, AddDoc { id: 9 }, Merge];
assert!(test_multivalued_no_panic(ops).is_ok());
}
#[test]
fn test_multivalued_proptest_off_by_one_bug_1151() {
use IndexingOp::*;
let ops = [
let ops = &[
AddDoc { id: 3 },
AddDoc { id: 1 },
AddDoc { id: 3 },
Commit,
Merge,
];
assert!(test_multivalued_no_panic(&ops[..]).is_ok());
assert!(test_multivalued_no_panic(ops).is_ok());
}
#[test]

View File

@@ -1,6 +1,6 @@
use std::ops::Range;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue, MultiValueLength};
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, FastValue, MultiValueLength};
use crate::DocId;
/// Reader for a multivalued `u64` fast field.
@@ -12,14 +12,14 @@ use crate::DocId;
/// The `idx_reader` associated, for each document, the index of its first value.
#[derive(Clone)]
pub struct MultiValuedFastFieldReader<Item: FastValue> {
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: DynamicFastFieldReader<Item>,
idx_reader: FastFieldReaderImpl<u64>,
vals_reader: FastFieldReaderImpl<Item>,
}
impl<Item: FastValue> MultiValuedFastFieldReader<Item> {
pub(crate) fn open(
idx_reader: DynamicFastFieldReader<u64>,
vals_reader: DynamicFastFieldReader<Item>,
idx_reader: FastFieldReaderImpl<u64>,
vals_reader: FastFieldReaderImpl<Item>,
) -> MultiValuedFastFieldReader<Item> {
MultiValuedFastFieldReader {
idx_reader,

View File

@@ -1,26 +1,8 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::Path;
use fastfield_codecs::bitpacked::{
BitpackedFastFieldReader as BitpackedReader, BitpackedFastFieldSerializer,
};
use fastfield_codecs::linearinterpol::{
LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer,
};
use fastfield_codecs::multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer};
use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID};
use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr};
use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
use crate::schema::{Schema, FAST};
use super::FastValue;
use crate::DocId;
/// FastFieldReader is the trait to access fast field data.
pub trait FastFieldReader<Item: FastValue>: Clone {
pub trait FastFieldReader<Item: FastValue> {
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
@@ -59,298 +41,3 @@ pub trait FastFieldReader<Item: FastValue>: Clone {
/// of the actual maximum value.
fn max_value(&self) -> Item;
}
#[derive(Clone)]
/// DynamicFastFieldReader wraps different readers to access
/// the various encoded fastfield data
pub enum DynamicFastFieldReader<Item: FastValue> {
/// Bitpacked compressed fastfield data.
Bitpacked(FastFieldReaderCodecWrapper<Item, BitpackedReader>),
/// Linear interpolated values + bitpacked
LinearInterpol(FastFieldReaderCodecWrapper<Item, LinearInterpolFastFieldReader>),
/// Blockwise linear interpolated values + bitpacked
MultiLinearInterpol(FastFieldReaderCodecWrapper<Item, MultiLinearInterpolFastFieldReader>),
/// GCD and Bitpacked compressed fastfield data.
BitpackedGCD(FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<BitpackedReader>>),
/// GCD and Linear interpolated values + bitpacked
LinearInterpolGCD(
FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<LinearInterpolFastFieldReader>>,
),
/// GCD and Blockwise linear interpolated values + bitpacked
MultiLinearInterpolGCD(
FastFieldReaderCodecWrapper<Item, GCDFastFieldCodec<MultiLinearInterpolFastFieldReader>>,
),
}
impl<Item: FastValue> DynamicFastFieldReader<Item> {
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open_from_id(
mut bytes: OwnedBytes,
codec_id: u8,
) -> crate::Result<DynamicFastFieldReader<Item>> {
let reader = match codec_id {
BitpackedFastFieldSerializer::ID => {
DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::<
Item,
BitpackedReader,
>::open_from_bytes(bytes)?)
}
LinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::LinearInterpol(FastFieldReaderCodecWrapper::<
Item,
LinearInterpolFastFieldReader,
>::open_from_bytes(bytes)?)
}
MultiLinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::MultiLinearInterpol(FastFieldReaderCodecWrapper::<
Item,
MultiLinearInterpolFastFieldReader,
>::open_from_bytes(
bytes
)?)
}
_ if codec_id == GCD_CODEC_ID => {
let codec_id = bytes.read_u8();
match codec_id {
BitpackedFastFieldSerializer::ID => {
DynamicFastFieldReader::BitpackedGCD(FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<BitpackedReader>,
>::open_from_bytes(
bytes
)?)
}
LinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::LinearInterpolGCD(FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<LinearInterpolFastFieldReader>,
>::open_from_bytes(
bytes
)?)
}
MultiLinearInterpolFastFieldSerializer::ID => {
DynamicFastFieldReader::MultiLinearInterpolGCD(
FastFieldReaderCodecWrapper::<
Item,
GCDFastFieldCodec<MultiLinearInterpolFastFieldReader>,
>::open_from_bytes(bytes)?,
)
}
_ => {
panic!(
"unknown fastfield codec id {:?}. Data corrupted or using old tantivy \
version.",
codec_id
)
}
}
}
_ => {
panic!(
"unknown fastfield codec id {:?}. Data corrupted or using old tantivy version.",
codec_id
)
}
};
Ok(reader)
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
let mut bytes = file.read_bytes()?;
let codec_id = bytes.read_u8();
Self::open_from_id(bytes, codec_id)
}
}
impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
#[inline]
fn get(&self, doc: DocId) -> Item {
match self {
Self::Bitpacked(reader) => reader.get(doc),
Self::LinearInterpol(reader) => reader.get(doc),
Self::MultiLinearInterpol(reader) => reader.get(doc),
Self::BitpackedGCD(reader) => reader.get(doc),
Self::LinearInterpolGCD(reader) => reader.get(doc),
Self::MultiLinearInterpolGCD(reader) => reader.get(doc),
}
}
#[inline]
fn get_range(&self, start: u64, output: &mut [Item]) {
match self {
Self::Bitpacked(reader) => reader.get_range(start, output),
Self::LinearInterpol(reader) => reader.get_range(start, output),
Self::MultiLinearInterpol(reader) => reader.get_range(start, output),
Self::BitpackedGCD(reader) => reader.get_range(start, output),
Self::LinearInterpolGCD(reader) => reader.get_range(start, output),
Self::MultiLinearInterpolGCD(reader) => reader.get_range(start, output),
}
}
fn min_value(&self) -> Item {
match self {
Self::Bitpacked(reader) => reader.min_value(),
Self::LinearInterpol(reader) => reader.min_value(),
Self::MultiLinearInterpol(reader) => reader.min_value(),
Self::BitpackedGCD(reader) => reader.min_value(),
Self::LinearInterpolGCD(reader) => reader.min_value(),
Self::MultiLinearInterpolGCD(reader) => reader.min_value(),
}
}
fn max_value(&self) -> Item {
match self {
Self::Bitpacked(reader) => reader.max_value(),
Self::LinearInterpol(reader) => reader.max_value(),
Self::MultiLinearInterpol(reader) => reader.max_value(),
Self::BitpackedGCD(reader) => reader.max_value(),
Self::LinearInterpolGCD(reader) => reader.max_value(),
Self::MultiLinearInterpolGCD(reader) => reader.max_value(),
}
}
}
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct FastFieldReaderCodecWrapper<Item: FastValue, CodecReader> {
reader: CodecReader,
bytes: OwnedBytes,
_phantom: PhantomData<Item>,
}
impl<Item: FastValue, C: FastFieldCodecReader> FastFieldReaderCodecWrapper<Item, C> {
/// Opens a fast field given a file.
pub fn open(file: FileSlice) -> crate::Result<Self> {
let mut bytes = file.read_bytes()?;
let codec_id = bytes.read_u8();
assert_eq!(
BitpackedFastFieldSerializer::ID,
codec_id,
"Tried to open fast field as bitpacked encoded (id=1), but got serializer with \
different id"
);
Self::open_from_bytes(bytes)
}
/// Opens a fast field given the bytes.
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
let reader = C::open_from_bytes(bytes.as_slice())?;
Ok(FastFieldReaderCodecWrapper {
reader,
bytes,
_phantom: PhantomData,
})
}
#[inline]
pub(crate) fn get_u64(&self, doc: u64) -> Item {
let data = self.reader.get_u64(doc, self.bytes.as_slice());
Item::from_u64(data)
}
/// Internally `multivalued` also use SingleValue Fast fields.
/// It works as follows... A first column contains the list of start index
/// for each document, a second column contains the actual values.
///
/// The values associated to a given doc, are then
/// `second_column[first_column.get(doc)..first_column.get(doc+1)]`.
///
/// Which means single value fast field reader can be indexed internally with
/// something different from a `DocId`. For this use case, we want to use `u64`
/// values.
///
/// See `get_range` for an actual documentation about this method.
pub(crate) fn get_range_u64(&self, start: u64, output: &mut [Item]) {
for (i, out) in output.iter_mut().enumerate() {
*out = self.get_u64(start + (i as u64));
}
}
}
impl<Item: FastValue, C: FastFieldCodecReader + Clone> FastFieldReader<Item>
for FastFieldReaderCodecWrapper<Item, C>
{
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
// `maxdoc`.
fn get(&self, doc: DocId) -> Item {
self.get_u64(u64::from(doc))
}
/// Fills an output buffer with the fast field values
/// associated with the `DocId` going from
/// `start` to `start + output.len()`.
///
/// Regardless of the type of `Item`, this method works
/// - transmuting the output array
/// - extracting the `Item`s as if they were `u64`
/// - possibly converting the `u64` value to the right type.
///
/// # Panics
///
/// May panic if `start + output.len()` is greater than
/// the segment's `maxdoc`.
fn get_range(&self, start: u64, output: &mut [Item]) {
self.get_range_u64(start, output);
}
/// Returns the minimum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn min_value(&self) -> Item {
Item::from_u64(self.reader.min_value())
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn max_value(&self) -> Item {
Item::from_u64(self.reader.max_value())
}
}
impl<Item: FastValue> From<Vec<Item>> for DynamicFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> DynamicFastFieldReader<Item> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field("field", FAST);
let schema = schema_builder.build();
let path = Path::new("__dummy__");
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory
.open_write(path)
.expect("With a RamDirectory, this should never fail.");
let mut serializer = CompositeFastFieldSerializer::from_write(write)
.expect("With a RamDirectory, this should never fail.");
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema);
{
let fast_field_writer = fast_field_writers
.get_field_writer_mut(field)
.expect("With a RamDirectory, this should never fail.");
for val in vals {
fast_field_writer.add_val(val.to_u64());
}
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
let file = directory.open_read(path).expect("Failed to open the file");
let composite_file = CompositeFile::open(&file).expect("Failed to read the composite file");
let field_file = composite_file
.open_read(field)
.expect("File component not found");
DynamicFastFieldReader::open(field_file).unwrap()
}
}

View File

@@ -1,7 +1,7 @@
use super::reader::DynamicFastFieldReader;
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
BytesFastFieldReader, FastFieldNotAvailableError, FastFieldReaderImpl, FastValue,
MultiValuedFastFieldReader,
};
use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage;
@@ -109,14 +109,15 @@ impl FastFieldReaders {
&self,
field: Field,
index: usize,
) -> crate::Result<DynamicFastFieldReader<TFastValue>> {
) -> crate::Result<FastFieldReaderImpl<TFastValue>> {
let fast_field_slice = self.fast_field_data(field, index)?;
DynamicFastFieldReader::open(fast_field_slice)
let fast_field_data = fast_field_slice.read_bytes()?;
FastFieldReaderImpl::open_from_bytes(fast_field_data)
}
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
&self,
field: Field,
) -> crate::Result<DynamicFastFieldReader<TFastValue>> {
) -> crate::Result<FastFieldReaderImpl<TFastValue>> {
self.typed_fast_field_reader_with_idx(field, 0)
}
@@ -132,7 +133,7 @@ impl FastFieldReaders {
/// Returns the `u64` fast field reader reader associated to `field`.
///
/// If `field` is not a u64 fast field, this method returns an Error.
pub fn u64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<u64>> {
pub fn u64(&self, field: Field) -> crate::Result<FastFieldReaderImpl<u64>> {
self.check_type(field, FastType::U64, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -142,14 +143,14 @@ impl FastFieldReaders {
///
/// If not, the fastfield reader will returns the u64-value associated to the original
/// FastValue.
pub fn u64_lenient(&self, field: Field) -> crate::Result<DynamicFastFieldReader<u64>> {
pub fn u64_lenient(&self, field: Field) -> crate::Result<FastFieldReaderImpl<u64>> {
self.typed_fast_field_reader(field)
}
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns an Error.
pub fn i64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<i64>> {
pub fn i64(&self, field: Field) -> crate::Result<FastFieldReaderImpl<i64>> {
self.check_type(field, FastType::I64, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -157,7 +158,7 @@ impl FastFieldReaders {
/// Returns the `date` fast field reader reader associated to `field`.
///
/// If `field` is not a date fast field, this method returns an Error.
pub fn date(&self, field: Field) -> crate::Result<DynamicFastFieldReader<DateTime>> {
pub fn date(&self, field: Field) -> crate::Result<FastFieldReaderImpl<DateTime>> {
self.check_type(field, FastType::Date, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -165,7 +166,7 @@ impl FastFieldReaders {
/// Returns the `f64` fast field reader reader associated to `field`.
///
/// If `field` is not a f64 fast field, this method returns an Error.
pub fn f64(&self, field: Field) -> crate::Result<DynamicFastFieldReader<f64>> {
pub fn f64(&self, field: Field) -> crate::Result<FastFieldReaderImpl<f64>> {
self.check_type(field, FastType::F64, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -173,7 +174,7 @@ impl FastFieldReaders {
/// Returns the `bool` fast field reader reader associated to `field`.
///
/// If `field` is not a bool fast field, this method returns an Error.
pub fn bool(&self, field: Field) -> crate::Result<DynamicFastFieldReader<bool>> {
pub fn bool(&self, field: Field) -> crate::Result<FastFieldReaderImpl<bool>> {
self.check_type(field, FastType::Bool, Cardinality::SingleValue)?;
self.typed_fast_field_reader(field)
}
@@ -241,7 +242,8 @@ impl FastFieldReaders {
)));
}
let fast_field_idx_file = self.fast_field_data(field, 0)?;
let idx_reader = DynamicFastFieldReader::open(fast_field_idx_file)?;
let fast_field_idx_bytes = fast_field_idx_file.read_bytes()?;
let idx_reader = FastFieldReaderImpl::open_from_bytes(fast_field_idx_bytes)?;
let data = self.fast_field_data(field, 1)?;
BytesFastFieldReader::open(idx_reader, data)
} else {

View File

@@ -2,16 +2,12 @@ use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter};
pub use fastfield_codecs::bitpacked::{
BitpackedFastFieldSerializer, BitpackedFastFieldSerializerLegacy,
BitpackedFastFieldCodec, BitpackedFastFieldSerializerLegacy,
};
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
pub use fastfield_codecs::{FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
use fastfield_codecs::dynamic::{CodecType, DynamicFastFieldCodec};
pub use fastfield_codecs::{FastFieldCodec, FastFieldStats};
use super::{find_gcd, FastFieldCodecName, ALL_CODECS, GCD_DEFAULT};
use crate::directory::{CompositeWrite, WritePtr};
use crate::fastfield::gcd::write_gcd_header;
use crate::fastfield::GCD_CODEC_ID;
use crate::schema::Field;
/// `CompositeFastFieldSerializer` is in charge of serializing
@@ -36,249 +32,37 @@ use crate::schema::Field;
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
codec_enable_checker: FastFieldCodecEnableCheck,
}
#[derive(Debug, Clone)]
pub struct FastFieldCodecEnableCheck {
enabled_codecs: Vec<FastFieldCodecName>,
}
impl FastFieldCodecEnableCheck {
fn allow_all() -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: ALL_CODECS.to_vec(),
}
}
fn is_enabled(&self, codec_name: FastFieldCodecName) -> bool {
self.enabled_codecs.contains(&codec_name)
}
}
impl From<FastFieldCodecName> for FastFieldCodecEnableCheck {
fn from(codec_name: FastFieldCodecName) -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: vec![codec_name],
}
}
}
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
// https://github.com/rust-lang/rust/pull/86176
fn codec_estimation<T: FastFieldCodecSerializer, A: FastFieldDataAccess>(
stats: FastFieldStats,
fastfield_accessor: &A,
estimations: &mut Vec<(f32, &str, u8)>,
) {
if !T::is_applicable(fastfield_accessor, stats.clone()) {
return;
}
let (ratio, name, id) = (T::estimate(fastfield_accessor, stats), T::NAME, T::ID);
estimations.push((ratio, name, id));
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all())
}
/// Constructor
pub fn from_write_with_codec(
write: WritePtr,
codec_enable_checker: FastFieldCodecEnableCheck,
) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer {
composite_write,
codec_enable_checker,
})
Ok(CompositeFastFieldSerializer { composite_write })
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field<F, I>(
pub fn create_auto_detect_u64_fast_field(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter_gen: F,
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
self.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
iter_gen,
0,
)
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn write_header<W: Write>(field_write: &mut W, codec_id: u8) -> io::Result<()> {
codec_id.serialize(field_write)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx<F, I>(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter_gen: F,
idx: usize,
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
let field_write = self.composite_write.for_field_with_idx(field, idx);
let gcd = find_gcd(iter_gen().map(|val| val - stats.min_value)).unwrap_or(GCD_DEFAULT);
if gcd == 1 {
return Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
stats,
fastfield_accessor,
iter_gen(),
iter_gen(),
);
}
Self::write_header(field_write, GCD_CODEC_ID)?;
struct GCDWrappedFFAccess<T: FastFieldDataAccess> {
fastfield_accessor: T,
min_value: u64,
gcd: u64,
}
impl<T: FastFieldDataAccess> FastFieldDataAccess for GCDWrappedFFAccess<T> {
fn get_val(&self, position: u64) -> u64 {
(self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd
}
}
let fastfield_accessor = GCDWrappedFFAccess {
fastfield_accessor,
min_value: stats.min_value,
gcd,
};
let min_value = stats.min_value;
let stats = FastFieldStats {
min_value: 0,
max_value: (stats.max_value - stats.min_value) / gcd,
num_vals: stats.num_vals,
};
let iter1 = iter_gen().map(|val| (val - min_value) / gcd);
let iter2 = iter_gen().map(|val| (val - min_value) / gcd);
Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
stats,
fastfield_accessor,
iter1,
iter2,
)?;
write_gcd_header(field_write, min_value, gcd)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx_gcd<W: Write>(
codec_enable_checker: FastFieldCodecEnableCheck,
field: Field,
field_write: &mut CountingWriter<W>,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter1: impl Iterator<Item = u64>,
iter2: impl Iterator<Item = u64>,
vals: &[u64],
) -> io::Result<()> {
let mut estimations = vec![];
if codec_enable_checker.is_enabled(FastFieldCodecName::Bitpacked) {
codec_estimation::<BitpackedFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if codec_enable_checker.is_enabled(FastFieldCodecName::LinearInterpol) {
codec_estimation::<LinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if codec_enable_checker.is_enabled(FastFieldCodecName::BlockwiseLinearInterpol) {
codec_estimation::<MultiLinearInterpolFastFieldSerializer, _>(
stats.clone(),
&fastfield_accessor,
&mut estimations,
);
}
if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan())
{
warn!(
"broken estimation for fast field codec {}",
broken_estimation.1
);
}
// removing nan values for codecs with broken calculations, and max values which disables
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, name, id) = estimations[0];
debug!(
"choosing fast field codec {} for field_id {:?}",
name, field
); // todo print actual field name
Self::write_header(field_write, id)?;
match name {
BitpackedFastFieldSerializer::NAME => {
BitpackedFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
}
LinearInterpolFastFieldSerializer::NAME => {
LinearInterpolFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
}
MultiLinearInterpolFastFieldSerializer::NAME => {
MultiLinearInterpolFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
}
_ => {
panic!("unknown fastfield serializer {}", name)
}
}
field_write.flush()?;
self.create_auto_detect_u64_fast_field_with_idx(field, stats, vals, 0)
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx(
&mut self,
field: Field,
stats: FastFieldStats,
vals: &[u64],
idx: usize,
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
DynamicFastFieldCodec.serialize(field_write, vals, stats)?;
Ok(())
}
@@ -312,8 +96,7 @@ impl CompositeFastFieldSerializer {
) -> io::Result<BitpackedFastFieldSerializerLegacy<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
// Prepend codec id to field data for compatibility with DynamicFastFieldReader.
let id = BitpackedFastFieldSerializer::ID;
id.serialize(field_write)?;
CodecType::Bitpacked.serialize(field_write)?;
BitpackedFastFieldSerializerLegacy::open(field_write, min_value, max_value)
}

184
src/fastfield/wrapper.rs Normal file
View File

@@ -0,0 +1,184 @@
// Copyright (C) 2022 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::marker::PhantomData;
use fastfield_codecs::dynamic::DynamicFastFieldCodec;
use fastfield_codecs::{FastFieldCodec, FastFieldCodecReader, FastFieldStats};
use ownedbytes::OwnedBytes;
use crate::directory::FileSlice;
use crate::fastfield::{FastFieldReader, FastFieldReaderImpl, FastValue};
use crate::DocId;
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
pub struct FastFieldReaderWrapper<Item: FastValue, Codec: FastFieldCodec> {
reader: Codec::Reader,
_phantom: PhantomData<Item>,
_codec: PhantomData<Codec>,
}
impl<Item: FastValue, Codec: FastFieldCodec> FastFieldReaderWrapper<Item, Codec> {
fn new(reader: Codec::Reader) -> Self {
Self {
reader,
_phantom: PhantomData,
_codec: PhantomData,
}
}
}
impl<Item: FastValue, Codec: FastFieldCodec> Clone for FastFieldReaderWrapper<Item, Codec>
where Codec::Reader: Clone
{
fn clone(&self) -> Self {
Self {
reader: self.reader.clone(),
_phantom: PhantomData,
_codec: PhantomData,
}
}
}
impl<Item: FastValue, C: FastFieldCodec> FastFieldReader<Item> for FastFieldReaderWrapper<Item, C> {
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
// `maxdoc`.
fn get(&self, doc: DocId) -> Item {
self.get_u64(u64::from(doc))
}
/// Fills an output buffer with the fast field values
/// associated with the `DocId` going from
/// `start` to `start + output.len()`.
///
/// Regardless of the type of `Item`, this method works
/// - transmuting the output array
/// - extracting the `Item`s as if they were `u64`
/// - possibly converting the `u64` value to the right type.
///
/// # Panics
///
/// May panic if `start + output.len()` is greater than
/// the segment's `maxdoc`.
fn get_range(&self, start: u64, output: &mut [Item]) {
self.get_range_u64(start, output);
}
/// Returns the minimum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn min_value(&self) -> Item {
Item::from_u64(self.reader.min_value())
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
fn max_value(&self) -> Item {
Item::from_u64(self.reader.max_value())
}
}
impl<Item: FastValue, Codec: FastFieldCodec> FastFieldReaderWrapper<Item, Codec> {
/// Opens a fast field given a file.
pub fn open(file: FileSlice) -> crate::Result<Self> {
let mut bytes = file.read_bytes()?;
// TODO
// let codec_id = bytes.read_u8();
// assert_eq!(
// 0u8, codec_id,
// "Tried to open fast field as bitpacked encoded (id=1), but got serializer with \
// different id"
// );
Self::open_from_bytes(bytes)
}
/// Opens a fast field given the bytes.
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
let reader = Codec::open_from_bytes(bytes)?;
Ok(FastFieldReaderWrapper {
reader,
_codec: PhantomData,
_phantom: PhantomData,
})
}
#[inline]
pub(crate) fn get_u64(&self, doc: u64) -> Item {
let data = self.reader.get_u64(doc);
Item::from_u64(data)
}
/// Internally `multivalued` also use SingleValue Fast fields.
/// It works as follows... A first column contains the list of start index
/// for each document, a second column contains the actual values.
///
/// The values associated to a given doc, are then
/// `second_column[first_column.get(doc)..first_column.get(doc+1)]`.
///
/// Which means single value fast field reader can be indexed internally with
/// something different from a `DocId`. For this use case, we want to use `u64`
/// values.
///
/// See `get_range` for an actual documentation about this method.
pub(crate) fn get_range_u64(&self, start: u64, output: &mut [Item]) {
for (i, out) in output.iter_mut().enumerate() {
*out = self.get_u64(start + (i as u64));
}
}
}
use itertools::Itertools;
impl<Item: FastValue, Arr: AsRef<[Item]>> From<Arr> for FastFieldReaderImpl<Item> {
fn from(vals: Arr) -> FastFieldReaderImpl<Item> {
let mut buffer = Vec::new();
let vals_u64: Vec<u64> = vals.as_ref().iter().map(|val| val.to_u64()).collect();
let (min_value, max_value) = vals_u64
.iter()
.copied()
.minmax()
.into_option()
.expect("Expected non empty");
let stats = FastFieldStats {
min_value,
max_value,
num_vals: vals_u64.len() as u64,
};
DynamicFastFieldCodec
.serialize(&mut buffer, &vals_u64, stats)
.unwrap();
let bytes = OwnedBytes::new(buffer);
let fast_field_reader = DynamicFastFieldCodec::open_from_bytes(bytes).unwrap();
FastFieldReaderImpl::new(fast_field_reader)
}
}

View File

@@ -7,7 +7,7 @@ use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::serializer::FastFieldStats;
use super::{FastFieldDataAccess, FastFieldType, FastValue};
use super::{FastFieldType, FastValue};
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
@@ -217,12 +217,13 @@ impl FastFieldsWriter {
) -> io::Result<()> {
for field_writer in &self.term_id_writers {
let field = field_writer.field();
dbg!("multifield", field);
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
}
for field_writer in &self.single_value_writers {
dbg!("singlefield");
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in &self.multi_values_writers {
let field = field_writer.field();
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
@@ -359,64 +360,26 @@ impl IntFastFieldWriter {
(self.val_min, self.val_max)
};
let fastfield_accessor = WriterFastFieldAccessProvider {
doc_id_map,
vals: &self.vals,
};
let vals = compute_fast_field_vals(&self.vals, doc_id_map);
let stats = FastFieldStats {
min_value: min,
max_value: max,
num_vals: self.val_count as u64,
};
if let Some(doc_id_map) = doc_id_map {
let iter_gen = || {
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize))
};
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter_gen,
)?;
} else {
let iter_gen = || self.vals.iter();
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter_gen,
)?;
};
dbg!(&stats);
dbg!(&vals);
serializer.create_auto_detect_u64_fast_field(self.field, stats, &vals)?;
Ok(())
}
}
#[derive(Clone)]
struct WriterFastFieldAccessProvider<'map, 'bitp> {
doc_id_map: Option<&'map DocIdMapping>,
vals: &'bitp BlockedBitpacker,
}
impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> {
/// Return the value associated to the given doc.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
/// reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the index.
fn get_val(&self, doc: u64) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals
.get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra
// FastFieldReader wrapper for
// non doc_id_map
} else {
self.vals.get(doc as usize)
}
fn compute_fast_field_vals(vals: &BlockedBitpacker, doc_id_map: Option<&DocIdMapping>) -> Vec<u64> {
if let Some(doc_id_mapping) = doc_id_map {
doc_id_mapping
.iter_old_doc_ids()
.map(|old_doc_id| vals.get(old_doc_id as usize))
.collect()
} else {
vals.iter().collect()
}
}

View File

@@ -10,8 +10,8 @@ use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
AliveBitSet, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldDataAccess,
FastFieldReader, FastFieldStats, MultiValueLength, MultiValuedFastFieldReader,
AliveBitSet, CompositeFastFieldSerializer, FastFieldReader, FastFieldReaderImpl,
FastFieldStats, MultiValueLength, MultiValuedFastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -164,6 +164,30 @@ impl DeltaComputer {
}
}
fn compute_sorted_multivalued_vals(
doc_id_mapping: &SegmentDocIdMapping,
fast_field_readers: &Vec<MultiValuedFastFieldReader<u64>>,
) -> Vec<u64> {
let mut vals = Vec::new();
let mut buf: Vec<u64> = Vec::new();
for &(doc_id, segment_ord) in doc_id_mapping.iter() {
fast_field_readers[segment_ord as usize].get_vals(doc_id, &mut buf);
vals.extend_from_slice(&buf);
}
vals
}
fn compute_vals_sorted(
doc_id_mapping: &SegmentDocIdMapping,
fast_field_readers: &[FastFieldReaderImpl<u64>],
) -> Vec<u64> {
let mut vals = Vec::with_capacity(doc_id_mapping.len());
for &(doc_id, segment_ord) in doc_id_mapping.iter() {
vals.push(fast_field_readers[segment_ord as usize].get_u64(doc_id as u64));
}
vals
}
impl IndexMerger {
pub fn open(
schema: Schema,
@@ -342,7 +366,7 @@ impl IndexMerger {
.readers
.iter()
.filter_map(|reader| {
let u64_reader: DynamicFastFieldReader<u64> =
let u64_reader: FastFieldReaderImpl<u64> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
@@ -356,7 +380,7 @@ impl IndexMerger {
.readers
.iter()
.map(|reader| {
let u64_reader: DynamicFastFieldReader<u64> =
let u64_reader: crate::fastfield::FastFieldReaderImpl<u64> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
@@ -370,33 +394,9 @@ impl IndexMerger {
max_value,
num_vals: doc_id_mapping.len() as u64,
};
#[derive(Clone)]
struct SortedDocIdFieldAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
}
impl<'a> FastFieldDataAccess for SortedDocIdFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
let (doc_id, reader_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_ordinal as usize].get(doc_id)
}
}
let fastfield_accessor = SortedDocIdFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter_gen = || {
doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader.get(*doc_id)
})
};
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter_gen,
)?;
let vals = compute_vals_sorted(doc_id_mapping, &fast_field_readers);
fast_field_serializer.create_auto_detect_u64_fast_field(field, stats, &vals)?;
Ok(())
}
@@ -427,7 +427,7 @@ impl IndexMerger {
pub(crate) fn get_sort_field_accessor(
reader: &SegmentReader,
sort_by_field: &IndexSortByField,
) -> crate::Result<impl FastFieldReader<u64>> {
) -> crate::Result<FastFieldReaderImpl<u64>> {
let field_id = expect_field_id_for_sort_field(reader.schema(), sort_by_field)?; // for now expect fastfield, but not strictly required
let value_accessor = reader.fast_fields().u64_lenient(field_id)?;
Ok(value_accessor)
@@ -436,7 +436,7 @@ impl IndexMerger {
pub(crate) fn get_reader_with_sort_field_accessor(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<Vec<(SegmentOrdinal, impl FastFieldReader<u64> + Clone)>> {
) -> crate::Result<Vec<(SegmentOrdinal, FastFieldReaderImpl<u64>)>> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
@@ -548,7 +548,7 @@ impl IndexMerger {
// access on the fly or 2. change the codec api to make random access optional, but
// they both have also major drawbacks.
let mut offsets = Vec::with_capacity(doc_id_mapping.len());
let mut offsets: Vec<u64> = Vec::with_capacity(doc_id_mapping.len());
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping.iter() {
let reader = &reader_and_field_accessors[*reader as usize].1;
@@ -557,13 +557,7 @@ impl IndexMerger {
}
offsets.push(offset);
let iter_gen = || offsets.iter().cloned();
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
&offsets[..],
iter_gen,
)?;
fast_field_serializer.create_auto_detect_u64_fast_field(field, stats, &offsets[..])?;
Ok(offsets)
}
/// Returns the fastfield index (index for the data, not the data).
@@ -572,7 +566,7 @@ impl IndexMerger {
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Vec<u64>> {
) -> crate::Result<()> {
let reader_ordinal_and_field_accessors = self
.readers
.iter()
@@ -593,7 +587,8 @@ impl IndexMerger {
fast_field_serializer,
doc_id_mapping,
&reader_ordinal_and_field_accessors,
)
)?;
Ok(())
}
fn write_term_id_fast_field(
@@ -682,12 +677,7 @@ impl IndexMerger {
// The second contains the actual values.
// First we merge the idx fast field.
let offsets =
self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;
let mut min_value = u64::MAX;
let mut max_value = u64::MIN;
let mut num_vals = 0;
self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;
let mut vals = Vec::with_capacity(100);
@@ -709,75 +699,18 @@ impl IndexMerger {
);
for doc in reader.doc_ids_alive() {
ff_reader.get_vals(doc, &mut vals);
for &val in &vals {
min_value = cmp::min(val, min_value);
max_value = cmp::max(val, max_value);
}
num_vals += vals.len();
}
ff_readers.push(ff_reader);
// TODO optimize when no deletes
}
if min_value > max_value {
min_value = 0;
max_value = 0;
}
let vals = compute_sorted_multivalued_vals(doc_id_mapping, &ff_readers);
let stats = FastFieldStats::compute(&vals);
// We can now initialize our serializer, and push it the different values
let stats = FastFieldStats {
max_value,
num_vals: num_vals as u64,
min_value,
};
struct SortedDocIdMultiValueAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a Vec<MultiValuedFastFieldReader<u64>>,
offsets: Vec<u64>,
}
impl<'a> FastFieldDataAccess for SortedDocIdMultiValueAccessProvider<'a> {
fn get_val(&self, pos: u64) -> u64 {
// use the offsets index to find the doc_id which will contain the position.
// the offsets are strictly increasing so we can do a simple search on it.
let new_doc_id = self
.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds")
- 1;
// now we need to find the position of `pos` in the multivalued bucket
let num_pos_covered_until_now = self.offsets[new_doc_id];
let pos_in_values = pos - num_pos_covered_until_now;
let (old_doc_id, reader_ordinal) = self.doc_id_mapping[new_doc_id as usize];
let num_vals = self.fast_field_readers[reader_ordinal as usize].get_len(old_doc_id);
assert!(num_vals >= pos_in_values);
let mut vals = vec![];
self.fast_field_readers[reader_ordinal as usize].get_vals(old_doc_id, &mut vals);
vals[pos_in_values as usize]
}
}
let fastfield_accessor = SortedDocIdMultiValueAccessProvider {
doc_id_mapping,
fast_field_readers: &ff_readers,
offsets,
};
let iter_gen = || {
doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| {
let ff_reader = &ff_readers[*reader_ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
})
};
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
iter_gen,
&vals[..],
1,
)?;