Merge pull request #1504 from quickwit-oss/move-to-fastfield-codec

Move to fastfield codec
This commit is contained in:
PSeitz
2022-09-03 05:18:35 -07:00
committed by GitHub
19 changed files with 789 additions and 797 deletions

View File

@@ -14,6 +14,8 @@ tantivy-bitpacker = { version="0.2", path = "../bitpacker/" }
ownedbytes = { version = "0.3.0", path = "../ownedbytes" }
prettytable-rs = {version="0.9.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
log = "0.4"
[dev-dependencies]
more-asserts = "0.3.0"
@@ -23,4 +25,5 @@ rand = "0.8.3"
[features]
bin = ["prettytable-rs", "rand"]
default = ["bin"]
unstable = []

View File

@@ -27,7 +27,7 @@ mod tests {
}
fn bench_get<Codec: FastFieldCodec>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
Codec::serialize(&mut bytes, &data).unwrap();
Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap();
let reader = Codec::open_from_bytes(OwnedBytes::new(bytes)).unwrap();
b.iter(|| {
let mut sum = 0u64;
@@ -43,7 +43,7 @@ mod tests {
let mut bytes = Vec::new();
b.iter(|| {
bytes.clear();
Codec::serialize(&mut bytes, &data).unwrap();
Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap();
});
}

View File

@@ -81,8 +81,11 @@ impl<'a, T: Copy + PartialOrd> Column<T> for VecColumn<'a, T> {
}
}
impl<'a, T: Copy + Ord + Default> From<&'a [T]> for VecColumn<'a, T> {
fn from(values: &'a [T]) -> Self {
impl<'a, T: Copy + Ord + Default, V> From<&'a V> for VecColumn<'a, T>
where V: AsRef<[T]> + ?Sized
{
fn from(values: &'a V) -> Self {
let values = values.as_ref();
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
Self {
values,

207
fastfield_codecs/src/gcd.rs Normal file
View File

@@ -0,0 +1,207 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use common::BinarySerializable;
use fastdivide::DividerU64;
#[derive(Debug, Clone, Copy)]
pub struct GCDParams {
pub gcd: u64,
pub min_value: u64,
pub num_vals: u64,
}
impl BinarySerializable for GCDParams {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.gcd.serialize(writer)?;
self.min_value.serialize(writer)?;
self.num_vals.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let gcd: u64 = u64::deserialize(reader)?;
let min_value: u64 = u64::deserialize(reader)?;
let num_vals: u64 = u64::deserialize(reader)?;
Ok(Self {
gcd,
min_value,
num_vals,
})
}
}
/// Compute the gcd of two non null numbers.
///
/// It is recommended, but not required, to feed values such that `large >= small`.
fn compute_gcd(mut large: NonZeroU64, mut small: NonZeroU64) -> NonZeroU64 {
loop {
let rem: u64 = large.get() % small;
if let Some(new_small) = NonZeroU64::new(rem) {
(large, small) = (small, new_small);
} else {
return small;
}
}
}
// Find GCD for iterator of numbers
pub fn find_gcd(numbers: impl Iterator<Item = u64>) -> Option<NonZeroU64> {
let mut numbers = numbers.flat_map(NonZeroU64::new);
let mut gcd: NonZeroU64 = numbers.next()?;
if gcd.get() == 1 {
return Some(gcd);
}
let mut gcd_divider = DividerU64::divide_by(gcd.get());
for val in numbers {
let remainder = val.get() - (gcd_divider.divide(val.get())) * gcd.get();
if remainder == 0 {
continue;
}
gcd = compute_gcd(val, gcd);
if gcd.get() == 1 {
return Some(gcd);
}
gcd_divider = DividerU64::divide_by(gcd.get());
}
Some(gcd)
}
#[cfg(test)]
mod tests {
use std::io;
use std::num::NonZeroU64;
use ownedbytes::OwnedBytes;
use crate::gcd::{compute_gcd, find_gcd};
use crate::{FastFieldCodecType, VecColumn};
fn test_fastfield_gcd_i64_with_codec(
codec_type: FastFieldCodecType,
num_vals: usize,
) -> io::Result<()> {
let mut vals: Vec<i64> = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::serialize(
VecColumn::from(&vals),
&mut buffer,
&[codec_type, FastFieldCodecType::Gcd],
)?;
let buffer = OwnedBytes::new(buffer);
let column = crate::open::<i64>(buffer.clone())?;
assert_eq!(column.get_val(0), -4000i64);
assert_eq!(column.get_val(1), -3000i64);
assert_eq!(column.get_val(2), -2000i64);
assert_eq!(column.max_value(), (num_vals as i64 - 5) * 1000);
assert_eq!(column.min_value(), -4000i64);
// Can't apply gcd
let mut buffer_without_gcd = Vec::new();
vals.pop();
vals.push(1001i64);
crate::serialize(
VecColumn::from(&vals),
&mut buffer_without_gcd,
&[codec_type],
)?;
let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd);
assert!(buffer_without_gcd.len() > buffer.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_i64() -> io::Result<()> {
for &codec_type in &[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
FastFieldCodecType::Linear,
] {
test_fastfield_gcd_i64_with_codec(codec_type, 5500)?;
}
Ok(())
}
fn test_fastfield_gcd_u64_with_codec(
codec_type: FastFieldCodecType,
num_vals: usize,
) -> io::Result<()> {
let mut vals: Vec<u64> = (1..=num_vals).map(|i| i as u64 * 1000u64).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::serialize(
VecColumn::from(&vals),
&mut buffer,
&[codec_type, FastFieldCodecType::Gcd],
)?;
let buffer = OwnedBytes::new(buffer);
let column = crate::open::<u64>(buffer.clone())?;
assert_eq!(column.get_val(0), 1000u64);
assert_eq!(column.get_val(1), 2000u64);
assert_eq!(column.get_val(2), 3000u64);
assert_eq!(column.max_value(), num_vals as u64 * 1000);
assert_eq!(column.min_value(), 1000u64);
// Can't apply gcd
let mut buffer_without_gcd = Vec::new();
vals.pop();
vals.push(1001u64);
crate::serialize(
VecColumn::from(&vals),
&mut buffer_without_gcd,
&[codec_type],
)?;
let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd);
assert!(buffer_without_gcd.len() > buffer.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_u64() -> io::Result<()> {
for &codec_type in &[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
FastFieldCodecType::Linear,
] {
test_fastfield_gcd_u64_with_codec(codec_type, 5500)?;
}
Ok(())
}
#[test]
pub fn test_fastfield2() {
let test_fastfield = crate::serialize_and_load(&[100u64, 200u64, 300u64]);
assert_eq!(test_fastfield.get_val(0), 100);
assert_eq!(test_fastfield.get_val(1), 200);
assert_eq!(test_fastfield.get_val(2), 300);
}
#[test]
fn test_compute_gcd() {
let test_compute_gcd_aux = |large, small, expected| {
let large = NonZeroU64::new(large).unwrap();
let small = NonZeroU64::new(small).unwrap();
let expected = NonZeroU64::new(expected).unwrap();
assert_eq!(compute_gcd(small, large), expected);
assert_eq!(compute_gcd(large, small), expected);
};
test_compute_gcd_aux(1, 4, 1);
test_compute_gcd_aux(2, 4, 2);
test_compute_gcd_aux(10, 25, 5);
test_compute_gcd_aux(25, 25, 25);
}
#[test]
fn find_gcd_test() {
assert_eq!(find_gcd([0].into_iter()), None);
assert_eq!(find_gcd([0, 10].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([10, 0].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([].into_iter()), None);
assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([15, 16, 10].into_iter()), NonZeroU64::new(1));
assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([0, 0].into_iter()), None);
}
}

View File

@@ -1,7 +1,12 @@
#![cfg_attr(all(feature = "unstable", test), feature(test))]
#[cfg(test)]
#[macro_use]
extern crate more_asserts;
#[cfg(all(test, feature = "unstable"))]
extern crate test;
use std::io;
use std::io::Write;
@@ -13,8 +18,11 @@ pub mod blockwise_linear;
pub mod linear;
mod column;
mod gcd;
mod serialize;
pub use self::column::{monotonic_map_column, Column, VecColumn};
pub use self::serialize::{open, serialize, serialize_and_load};
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
#[repr(u8)]
@@ -54,6 +62,67 @@ impl FastFieldCodecType {
}
}
pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy {
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(self) -> u64;
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
}
impl MonotonicallyMappableToU64 for u64 {
fn to_u64(self) -> u64 {
self
}
fn from_u64(val: u64) -> Self {
val
}
}
impl MonotonicallyMappableToU64 for i64 {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self)
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
}
impl MonotonicallyMappableToU64 for bool {
#[inline(always)]
fn to_u64(self) -> u64 {
if self {
1
} else {
0
}
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
val > 0
}
}
impl MonotonicallyMappableToU64 for f64 {
fn to_u64(self) -> u64 {
common::f64_to_u64(self)
}
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldCodec: 'static {
@@ -82,6 +151,13 @@ pub trait FastFieldCodec: 'static {
fn estimate(fastfield_accessor: &impl Column) -> Option<f32>;
}
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 4] = [
FastFieldCodecType::Bitpacked,
FastFieldCodecType::BlockwiseLinear,
FastFieldCodecType::Gcd,
FastFieldCodecType::Linear,
];
#[derive(Debug, Clone)]
/// Statistics are used in codec detection and stored in the fast field footer.
pub struct FastFieldStats {
@@ -255,3 +331,121 @@ mod tests {
assert_eq!(count_codec, 4);
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use std::sync::Arc;
use rand::prelude::*;
use test::{self, Bencher};
use crate::Column;
// Warning: this generates the same permutation at each call
fn generate_permutation() -> Vec<u64> {
let mut permutation: Vec<u64> = (0u64..100_000u64).collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}
// Warning: this generates the same permutation at each call
fn generate_permutation_gcd() -> Vec<u64> {
let mut permutation: Vec<u64> = (1u64..100_000u64).map(|el| el * 1000).collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}
#[bench]
fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = permutation[a as usize];
}
a
});
}
#[bench]
fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = crate::serialize_and_load(&permutation);
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = column.get_val(a as u64);
}
a
});
}
#[bench]
fn bench_intfastfield_stride7_vec(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for i in (0..n / 7).map(|val| val * 7) {
a += permutation[i as usize];
}
a
});
}
#[bench]
fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = crate::serialize_and_load(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in (0..n / 7).map(|val| val * 7) {
a += column.get_val(i as u64);
}
a
});
}
#[bench]
fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = crate::serialize_and_load(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in 0u64..n as u64 {
a += column.get_val(i);
}
a
});
}
#[bench]
fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) {
let permutation = generate_permutation_gcd();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = crate::serialize_and_load(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in 0..n as u64 {
a += column.get_val(i);
}
a
});
}
#[bench]
fn bench_intfastfield_scan_all_vec(b: &mut Bencher) {
let permutation = generate_permutation();
b.iter(|| {
let mut a = 0u64;
for i in 0..permutation.len() {
a += permutation[i as usize] as u64;
}
a
});
}
}

View File

@@ -0,0 +1,233 @@
// Copyright (C) 2022 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::io;
use std::num::NonZeroU64;
use std::sync::Arc;
use common::BinarySerializable;
use fastdivide::DividerU64;
use log::warn;
use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::gcd::{find_gcd, GCDParams};
use crate::linear::LinearCodec;
use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
VecColumn, ALL_CODEC_TYPES,
};
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
// https://github.com/rust-lang/rust/pull/86176
fn codec_estimation<C: FastFieldCodec, D: Column>(
fastfield_accessor: &D,
estimations: &mut Vec<(f32, FastFieldCodecType)>,
) {
if let Some(ratio) = C::estimate(fastfield_accessor) {
estimations.push((ratio, C::CODEC_TYPE));
}
}
fn write_header<W: io::Write>(codec_type: FastFieldCodecType, output: &mut W) -> io::Result<()> {
codec_type.to_code().serialize(output)?;
Ok(())
}
fn gcd_params(column: &impl Column<u64>) -> Option<GCDParams> {
let min_value = column.min_value();
let gcd = find_gcd(column.iter().map(|val| val - min_value)).map(NonZeroU64::get)?;
if gcd == 1 {
return None;
}
Some(GCDParams {
gcd,
min_value,
num_vals: column.num_vals(),
})
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
open_from_id(bytes, codec_type)
}
fn open_codec_from_bytes<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let reader = C::open_from_bytes(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
pub fn open_gcd_from_bytes<WrappedCodec: FastFieldCodec>(
bytes: OwnedBytes,
) -> io::Result<impl Column> {
let footer_offset = bytes.len() - 24;
let (body, mut footer) = bytes.split(footer_offset);
let gcd_params = GCDParams::deserialize(&mut footer)?;
let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val;
let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?;
Ok(monotonic_map_column(reader, gcd_remap))
}
fn open_codec_with_gcd<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let reader = open_gcd_from_bytes::<C>(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
fn open_from_id<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
codec_type: FastFieldCodecType,
) -> io::Result<Arc<dyn Column<T>>> {
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_from_bytes::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_from_bytes::<LinearCodec, _>(bytes),
FastFieldCodecType::BlockwiseLinear => {
open_codec_from_bytes::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_with_gcd::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_with_gcd::<LinearCodec, _>(bytes),
FastFieldCodecType::BlockwiseLinear => {
open_codec_with_gcd::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Gcd codec wrapped into another gcd codec. This combination is not allowed.",
)),
}
}
}
}
pub fn serialize<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
output: &mut impl io::Write,
codecs: &[FastFieldCodecType],
) -> io::Result<()> {
let column = monotonic_map_column(typed_column, T::to_u64);
let gcd_params_opt = if codecs.contains(&FastFieldCodecType::Gcd) {
gcd_params(&column)
} else {
None
};
let gcd_params = if let Some(gcd_params) = gcd_params_opt {
gcd_params
} else {
return serialize_without_gcd(column, output, codecs);
};
write_header(FastFieldCodecType::Gcd, output)?;
let base_value = column.min_value();
let gcd_divider = DividerU64::divide_by(gcd_params.gcd);
let divided_fastfield_accessor =
monotonic_map_column(column, |val: u64| gcd_divider.divide(val - base_value));
serialize_without_gcd(divided_fastfield_accessor, output, codecs)?;
gcd_params.serialize(output)?;
Ok(())
}
fn serialize_without_gcd(
column: impl Column<u64>,
output: &mut impl io::Write,
codecs: &[FastFieldCodecType],
) -> io::Result<()> {
let mut estimations = Vec::new();
for &codec in codecs {
if codec == FastFieldCodecType::Gcd {
continue;
}
match codec {
FastFieldCodecType::Bitpacked => {
codec_estimation::<BitpackedCodec, _>(&column, &mut estimations);
}
FastFieldCodecType::Linear => {
codec_estimation::<LinearCodec, _>(&column, &mut estimations);
}
FastFieldCodecType::BlockwiseLinear => {
codec_estimation::<BlockwiseLinearCodec, _>(&column, &mut estimations);
}
FastFieldCodecType::Gcd => {}
}
}
if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) {
warn!(
"broken estimation for fast field codec {:?}",
broken_estimation.1
);
}
// removing nan values for codecs with broken calculations, and max values which disables
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, codec_type) = estimations[0];
write_header(codec_type, output)?;
match codec_type {
FastFieldCodecType::Bitpacked => {
BitpackedCodec::serialize(output, &column)?;
}
FastFieldCodecType::Linear => {
LinearCodec::serialize(output, &column)?;
}
FastFieldCodecType::BlockwiseLinear => {
BlockwiseLinearCodec::serialize(output, &column)?;
}
FastFieldCodecType::Gcd => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"GCD codec not supported.",
));
}
}
output.flush()?;
Ok(())
}
pub fn serialize_and_load<T: MonotonicallyMappableToU64 + Ord + Default>(
column: &[T],
) -> Arc<dyn Column<T>> {
let mut buffer = Vec::new();
super::serialize(VecColumn::from(column), &mut buffer, &ALL_CODEC_TYPES).unwrap();
super::open(OwnedBytes::new(buffer)).unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_deserialize() {
let original = [1u64, 5u64, 10u64];
let restored: Vec<u64> = serialize_and_load(&original[..]).iter().collect();
assert_eq!(&restored, &original[..]);
}
}

View File

@@ -423,12 +423,13 @@ pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> Key {
#[cfg(test)]
mod tests {
use fastfield_codecs::MonotonicallyMappableToU64;
use super::*;
use crate::aggregation::agg_req::{
Aggregation, Aggregations, BucketAggregation, BucketAggregationType,
};
use crate::aggregation::tests::{exec_request_with_query, get_test_index_with_num_docs};
use crate::fastfield::FastValue;
pub fn get_collector_from_ranges(
ranges: Vec<RangeAggregationRange>,

View File

@@ -161,7 +161,6 @@ mod collector;
pub mod intermediate_agg_result;
pub mod metric;
mod segment_agg_result;
use std::collections::HashMap;
use std::fmt::Display;
@@ -169,10 +168,10 @@ pub use collector::{
AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector,
MAX_BUCKET_COUNT,
};
use fastfield_codecs::MonotonicallyMappableToU64;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::fastfield::FastValue;
use crate::schema::Type;
/// Represents an associative array `(key => values)` in a very efficient manner.

View File

@@ -1,328 +0,0 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use common::BinarySerializable;
use fastdivide::DividerU64;
use fastfield_codecs::{monotonic_map_column, Column, FastFieldCodec};
use ownedbytes::OwnedBytes;
pub const GCD_DEFAULT: u64 = 1;
#[derive(Debug, Clone, Copy)]
struct GCDParams {
gcd: u64,
min_value: u64,
num_vals: u64,
}
impl BinarySerializable for GCDParams {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.gcd.serialize(writer)?;
self.min_value.serialize(writer)?;
self.num_vals.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let gcd: u64 = u64::deserialize(reader)?;
let min_value: u64 = u64::deserialize(reader)?;
let num_vals: u64 = u64::deserialize(reader)?;
Ok(Self {
gcd,
min_value,
num_vals,
})
}
}
pub fn open_gcd_from_bytes<WrappedCodec: FastFieldCodec>(
bytes: OwnedBytes,
) -> io::Result<impl Column> {
let footer_offset = bytes.len() - 24;
let (body, mut footer) = bytes.split(footer_offset);
let gcd_params = GCDParams::deserialize(&mut footer)?;
let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val;
let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?;
Ok(monotonic_map_column(reader, gcd_remap))
}
pub fn write_gcd_header<W: Write>(
field_write: &mut W,
min_value: u64,
gcd: u64,
num_vals: u64,
) -> io::Result<()> {
gcd.serialize(field_write)?;
min_value.serialize(field_write)?;
num_vals.serialize(field_write)?;
Ok(())
}
/// Compute the gcd of two non null numbers.
///
/// It is recommended, but not required, to feed values such that `large >= small`.
fn compute_gcd(mut large: NonZeroU64, mut small: NonZeroU64) -> NonZeroU64 {
loop {
let rem: u64 = large.get() % small;
if let Some(new_small) = NonZeroU64::new(rem) {
(large, small) = (small, new_small);
} else {
return small;
}
}
}
// Find GCD for iterator of numbers
pub fn find_gcd(numbers: impl Iterator<Item = u64>) -> Option<NonZeroU64> {
let mut numbers = numbers.flat_map(NonZeroU64::new);
let mut gcd: NonZeroU64 = numbers.next()?;
if gcd.get() == 1 {
return Some(gcd);
}
let mut gcd_divider = DividerU64::divide_by(gcd.get());
for val in numbers {
let remainder = val.get() - (gcd_divider.divide(val.get())) * gcd.get();
if remainder == 0 {
continue;
}
gcd = compute_gcd(val, gcd);
if gcd.get() == 1 {
return Some(gcd);
}
gcd_divider = DividerU64::divide_by(gcd.get());
}
Some(gcd)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use common::HasLen;
use fastfield_codecs::Column;
use crate::directory::{CompositeFile, RamDirectory, WritePtr};
use crate::fastfield::gcd::compute_gcd;
use crate::fastfield::reader::open_fast_field;
use crate::fastfield::serializer::FastFieldCodecEnableCheck;
use crate::fastfield::tests::{encode_decode_fast_field, FIELD, FIELDI64, SCHEMA, SCHEMAI64};
use crate::fastfield::{
find_gcd, CompositeFastFieldSerializer, FastFieldCodecType, FastFieldsWriter, ALL_CODECS,
};
use crate::schema::{Cardinality, Schema};
use crate::{DateOptions, DatePrecision, DateTime, Directory};
fn get_index(
docs: &[crate::Document],
schema: &Schema,
codec_enable_checker: FastFieldCodecEnableCheck,
) -> crate::Result<RamDirectory> {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer =
CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker)
.unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(schema);
for doc in docs {
fast_field_writers.add_document(doc);
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
Ok(directory)
}
fn test_fastfield_gcd_i64_with_codec(
code_type: FastFieldCodecType,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = (i as i64 - 5) * 1000i64;
docs.push(doc!(*FIELDI64=>val));
}
let directory = get_index(&docs, &SCHEMAI64, code_type.into())?;
let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader: Arc<dyn Column<i64>> = open_fast_field(file.read_bytes()?)?;
assert_eq!(fast_field_reader.get_val(0), -4000i64);
assert_eq!(fast_field_reader.get_val(1), -3000i64);
assert_eq!(fast_field_reader.get_val(2), -2000i64);
assert_eq!(fast_field_reader.max_value(), (num_vals as i64 - 5) * 1000);
assert_eq!(fast_field_reader.min_value(), -4000i64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001i64));
let directory = get_index(&docs, &SCHEMAI64, code_type.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_i64() -> crate::Result<()> {
for &code_type in ALL_CODECS {
test_fastfield_gcd_i64_with_codec(code_type, 5500)?;
}
Ok(())
}
fn test_fastfield_gcd_u64_with_codec(
code_type: FastFieldCodecType,
num_vals: usize,
) -> crate::Result<()> {
let path = Path::new("test");
let mut docs = vec![];
for i in 1..=num_vals {
let val = i as u64 * 1000u64;
docs.push(doc!(*FIELD=>val));
}
let directory = get_index(&docs, &SCHEMA, code_type.into())?;
let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = open_fast_field::<u64>(file.read_bytes()?)?;
assert_eq!(fast_field_reader.get_val(0), 1000u64);
assert_eq!(fast_field_reader.get_val(1), 2000u64);
assert_eq!(fast_field_reader.get_val(2), 3000u64);
assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000);
assert_eq!(fast_field_reader.min_value(), 1000u64);
let file = directory.open_read(path).unwrap();
// Can't apply gcd
let path = Path::new("test");
docs.pop();
docs.push(doc!(*FIELDI64=>2001u64));
let directory = get_index(&docs, &SCHEMA, code_type.into())?;
let file2 = directory.open_read(path).unwrap();
assert!(file2.len() > file.len());
Ok(())
}
#[test]
fn test_fastfield_gcd_u64() -> crate::Result<()> {
for &code_type in ALL_CODECS {
test_fastfield_gcd_u64_with_codec(code_type, 5500)?;
}
Ok(())
}
#[test]
pub fn test_fastfield2() {
let test_fastfield = encode_decode_fast_field(&[100u64, 200u64, 300u64]);
assert_eq!(test_fastfield.get_val(0), 100);
assert_eq!(test_fastfield.get_val(1), 200);
assert_eq!(test_fastfield.get_val(2), 300);
}
#[test]
pub fn test_gcd_date() -> crate::Result<()> {
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?;
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?;
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
Ok(())
}
fn test_gcd_date_with_codec(
codec_type: FastFieldCodecType,
precision: DatePrecision,
) -> crate::Result<usize> {
let time1 = DateTime::from_timestamp_micros(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let time2 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_micros(4111))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let time3 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_millis(2000))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let mut schema_builder = Schema::builder();
let date_options = DateOptions::default()
.set_fast(Cardinality::SingleValue)
.set_precision(precision);
let field = schema_builder.add_date_field("field", date_options);
let schema = schema_builder.build();
let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)];
let directory = get_index(&docs, &schema, codec_type.into())?;
let path = Path::new("test");
let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let len = file.len();
let test_fastfield = open_fast_field::<DateTime>(file.read_bytes()?)?;
assert_eq!(test_fastfield.get_val(0), time1.truncate(precision));
assert_eq!(test_fastfield.get_val(1), time2.truncate(precision));
assert_eq!(test_fastfield.get_val(2), time3.truncate(precision));
Ok(len)
}
#[test]
fn test_compute_gcd() {
let test_compute_gcd_aux = |large, small, expected| {
let large = NonZeroU64::new(large).unwrap();
let small = NonZeroU64::new(small).unwrap();
let expected = NonZeroU64::new(expected).unwrap();
assert_eq!(compute_gcd(small, large), expected);
assert_eq!(compute_gcd(large, small), expected);
};
test_compute_gcd_aux(1, 4, 1);
test_compute_gcd_aux(2, 4, 2);
test_compute_gcd_aux(10, 25, 5);
test_compute_gcd_aux(25, 25, 25);
}
#[test]
fn find_gcd_test() {
assert_eq!(find_gcd([0].into_iter()), None);
assert_eq!(find_gcd([0, 10].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([10, 0].into_iter()), NonZeroU64::new(10));
assert_eq!(find_gcd([].into_iter()), None);
assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([15, 16, 10].into_iter()), NonZeroU64::new(1));
assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), NonZeroU64::new(5));
assert_eq!(find_gcd([0, 0].into_iter()), None);
}
}

View File

@@ -20,13 +20,12 @@
//!
//! Read access performance is comparable to that of an array lookup.
use fastfield_codecs::FastFieldCodecType;
use fastfield_codecs::MonotonicallyMappableToU64;
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::gcd::{find_gcd, GCD_DEFAULT};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
@@ -39,19 +38,11 @@ mod alive_bitset;
mod bytes;
mod error;
mod facet_reader;
mod gcd;
mod multivalued;
mod reader;
mod readers;
mod serializer;
mod writer;
pub(crate) const ALL_CODECS: &[FastFieldCodecType; 3] = &[
FastFieldCodecType::Bitpacked,
FastFieldCodecType::Linear,
FastFieldCodecType::BlockwiseLinear,
];
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
@@ -63,47 +54,26 @@ pub trait MultiValueLength {
/// Trait for types that are allowed for fast fields:
/// (u64, i64 and f64, bool, DateTime).
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(&self) -> u64;
pub trait FastValue:
MonotonicallyMappableToU64 + Copy + Send + Sync + PartialOrd + 'static
{
/// Returns the fast field cardinality that can be extracted from the given
/// `FieldType`.
///
/// If the type is not a fast field, `None` is returned.
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
/// Cast value to `u64`.
/// The value is just reinterpreted in memory.
fn as_u64(&self) -> u64;
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u64(0i64.to_u64())
Self::from_u64(0u64)
}
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValue for u64 {
fn from_u64(val: u64) -> Self {
val
}
fn to_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
@@ -112,24 +82,12 @@ impl FastValue for u64 {
}
}
fn as_u64(&self) -> u64 {
*self
}
fn to_type() -> Type {
Type::U64
}
}
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
@@ -137,24 +95,12 @@ impl FastValue for i64 {
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::I64
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
@@ -162,27 +108,12 @@ impl FastValue for f64 {
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
fn to_type() -> Type {
Type::F64
}
}
impl FastValue for bool {
fn from_u64(val: u64) -> Self {
val != 0u64
}
fn to_u64(&self) -> u64 {
match self {
false => 0,
true => 1,
}
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
@@ -190,28 +121,26 @@ impl FastValue for bool {
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::Bool
}
}
impl MonotonicallyMappableToU64 for DateTime {
fn to_u64(self) -> u64 {
self.timestamp_micros.to_u64()
}
fn from_u64(val: u64) -> Self {
let timestamp_micros = i64::from_u64(val);
DateTime { timestamp_micros }
}
}
impl FastValue for DateTime {
/// Converts a timestamp microseconds into DateTime.
///
/// **Note the timestamps is expected to be in microseconds.**
fn from_u64(timestamp_micros_u64: u64) -> Self {
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
Self::from_timestamp_micros(timestamp_micros)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
@@ -219,13 +148,15 @@ impl FastValue for DateTime {
}
}
fn as_u64(&self) -> u64 {
self.into_timestamp_micros().as_u64()
}
fn to_type() -> Type {
Type::Date
}
fn make_zero() -> Self {
DateTime {
timestamp_micros: 0,
}
}
}
fn value_to_u64(value: &Value) -> u64 {
@@ -266,8 +197,10 @@ mod tests {
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use common::HasLen;
use fastfield_codecs::{open, FastFieldCodecType};
use once_cell::sync::Lazy;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
@@ -275,7 +208,6 @@ mod tests {
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::fastfield::reader::open_fast_field;
use crate::merge_policy::NoMergePolicy;
use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT};
use crate::time::OffsetDateTime;
@@ -286,61 +218,11 @@ mod tests {
schema_builder.add_u64_field("field", FAST);
schema_builder.build()
});
pub static SCHEMAI64: Lazy<Schema> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
schema_builder.add_i64_field("field", FAST);
schema_builder.build()
});
pub static FIELD: Lazy<Field> = Lazy::new(|| SCHEMA.get_field("field").unwrap());
pub static FIELDI64: Lazy<Field> = Lazy::new(|| SCHEMAI64.get_field("field").unwrap());
/// Encode values using the most appropriate codec and and then loads it
/// right away.
///
/// This is useful in tests and bench.
pub(crate) fn encode_decode_fast_field<Item: FastValue>(
vals: &[Item],
) -> Arc<dyn Column<Item>> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field("field", FAST);
let schema = schema_builder.build();
let path = Path::new("__dummy__");
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory
.open_write(path)
.expect("With a RamDirectory, this should never fail.");
let mut serializer = CompositeFastFieldSerializer::from_write(write)
.expect("With a RamDirectory, this should never fail.");
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema);
{
let fast_field_writer = fast_field_writers
.get_field_writer_mut(field)
.expect("With a RamDirectory, this should never fail.");
for val in vals {
fast_field_writer.add_val(val.to_u64());
}
}
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
let file = directory.open_read(path).expect("Failed to open the file");
let composite_file = CompositeFile::open(&file).expect("Failed to read the composite file");
let field_bytes = composite_file
.open_read(field)
.expect("File component not found")
.read_bytes()
.unwrap();
open_fast_field(field_bytes).unwrap()
}
#[test]
pub fn test_fastfield() {
let test_fastfield = encode_decode_fast_field(&[100u64, 200u64, 300u64]);
let test_fastfield = fastfield_codecs::serialize_and_load(&[100u64, 200u64, 300u64][..]);
assert_eq!(test_fastfield.get_val(0u64), 100);
assert_eq!(test_fastfield.get_val(1u64), 200);
assert_eq!(test_fastfield.get_val(2u64), 300);
@@ -372,7 +254,7 @@ mod tests {
assert_eq!(file.len(), 45);
let composite_file = CompositeFile::open(&file)?;
let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?;
let fast_field_reader = open_fast_field::<u64>(fast_field_bytes)?;
let fast_field_reader = open::<u64>(fast_field_bytes)?;
assert_eq!(fast_field_reader.get_val(0), 13u64);
assert_eq!(fast_field_reader.get_val(1), 14u64);
assert_eq!(fast_field_reader.get_val(2), 2u64);
@@ -407,7 +289,7 @@ mod tests {
.open_read(*FIELD)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<u64>(data)?;
let fast_field_reader = open::<u64>(data)?;
assert_eq!(fast_field_reader.get_val(0), 4u64);
assert_eq!(fast_field_reader.get_val(1), 14_082_001u64);
assert_eq!(fast_field_reader.get_val(2), 3_052u64);
@@ -446,7 +328,7 @@ mod tests {
.open_read(*FIELD)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<u64>(data)?;
let fast_field_reader = open::<u64>(data)?;
for doc in 0..10_000 {
assert_eq!(fast_field_reader.get_val(doc), 100_000u64);
}
@@ -481,7 +363,7 @@ mod tests {
.open_read(*FIELD)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<u64>(data)?;
let fast_field_reader = open::<u64>(data)?;
assert_eq!(fast_field_reader.get_val(0), 0u64);
for doc in 1..10_001 {
assert_eq!(
@@ -525,7 +407,7 @@ mod tests {
.open_read(i64_field)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<i64>(data)?;
let fast_field_reader = open::<i64>(data)?;
assert_eq!(fast_field_reader.min_value(), -100i64);
assert_eq!(fast_field_reader.max_value(), 9_999i64);
@@ -568,7 +450,7 @@ mod tests {
.open_read(i64_field)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<i64>(data)?;
let fast_field_reader = open::<i64>(data)?;
assert_eq!(fast_field_reader.get_val(0), 0i64);
}
Ok(())
@@ -609,7 +491,7 @@ mod tests {
.open_read(*FIELD)
.unwrap()
.read_bytes()?;
let fast_field_reader = open_fast_field::<u64>(data)?;
let fast_field_reader = open::<u64>(data)?;
for a in 0..n {
assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]);
@@ -865,7 +747,6 @@ mod tests {
#[test]
fn test_datefastfield() -> crate::Result<()> {
use crate::fastfield::FastValue;
let mut schema_builder = Schema::builder();
let date_field = schema_builder.add_date_field(
"date",
@@ -926,7 +807,8 @@ mod tests {
#[test]
pub fn test_fastfield_bool() {
let test_fastfield = encode_decode_fast_field::<bool>(&[true, false, true, false]);
let test_fastfield: Arc<dyn Column<bool>> =
fastfield_codecs::serialize_and_load::<bool>(&[true, false, true, false]);
assert_eq!(test_fastfield.get_val(0), true);
assert_eq!(test_fastfield.get_val(1), false);
assert_eq!(test_fastfield.get_val(2), true);
@@ -960,7 +842,7 @@ mod tests {
assert_eq!(file.len(), 44);
let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open_fast_field::<bool>(data)?;
let fast_field_reader = open::<bool>(data)?;
assert_eq!(fast_field_reader.get_val(0), true);
assert_eq!(fast_field_reader.get_val(1), false);
assert_eq!(fast_field_reader.get_val(2), true);
@@ -996,7 +878,7 @@ mod tests {
assert_eq!(file.len(), 56);
let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open_fast_field::<bool>(data)?;
let fast_field_reader = open::<bool>(data)?;
for i in 0..25 {
assert_eq!(fast_field_reader.get_val(i * 2), true);
assert_eq!(fast_field_reader.get_val(i * 2 + 1), false);
@@ -1030,115 +912,98 @@ mod tests {
assert_eq!(file.len(), 43);
let composite_file = CompositeFile::open(&file)?;
let data = composite_file.open_read(field).unwrap().read_bytes()?;
let fast_field_reader = open_fast_field::<bool>(data)?;
let fast_field_reader = open::<bool>(data)?;
assert_eq!(fast_field_reader.get_val(0), false);
Ok(())
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use std::sync::Arc;
use fastfield_codecs::Column;
use test::{self, Bencher};
use crate::fastfield::tests::{
encode_decode_fast_field, generate_permutation, generate_permutation_gcd,
};
#[bench]
fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = permutation[a as usize];
fn get_index(
docs: &[crate::Document],
schema: &Schema,
codec_types: &[FastFieldCodecType],
) -> crate::Result<RamDirectory> {
let directory: RamDirectory = RamDirectory::create();
{
let write: WritePtr = directory.open_write(Path::new("test")).unwrap();
let mut serializer =
CompositeFastFieldSerializer::from_write_with_codec(write, codec_types).unwrap();
let mut fast_field_writers = FastFieldsWriter::from_schema(schema);
for doc in docs {
fast_field_writers.add_document(doc);
}
a
});
fast_field_writers
.serialize(&mut serializer, &HashMap::new(), None)
.unwrap();
serializer.close().unwrap();
}
Ok(directory)
}
#[bench]
fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = encode_decode_fast_field(&permutation);
b.iter(|| {
let mut a = 0u64;
for _ in 0..n {
a = column.get_val(a as u64);
}
a
});
#[test]
pub fn test_gcd_date() -> crate::Result<()> {
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?;
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
let size_prec_sec =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?;
let size_prec_micro =
test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?;
assert!(size_prec_sec < size_prec_micro);
Ok(())
}
#[bench]
fn bench_intfastfield_stride7_vec(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
b.iter(|| {
let mut a = 0u64;
for i in (0..n / 7).map(|val| val * 7) {
a += permutation[i as usize];
}
a
});
}
fn test_gcd_date_with_codec(
codec_type: FastFieldCodecType,
precision: DatePrecision,
) -> crate::Result<usize> {
let time1 = DateTime::from_timestamp_micros(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
let time2 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_micros(4111))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
#[bench]
fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = encode_decode_fast_field(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in (0..n / 7).map(|val| val * 7) {
a += column.get_val(i as u64);
}
a
});
}
let time3 = DateTime::from_timestamp_micros(
SystemTime::now()
.checked_sub(Duration::from_millis(2000))
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
#[bench]
fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) {
let permutation = generate_permutation();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = encode_decode_fast_field(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in 0u64..n as u64 {
a += column.get_val(i);
}
a
});
}
let mut schema_builder = Schema::builder();
let date_options = DateOptions::default()
.set_fast(Cardinality::SingleValue)
.set_precision(precision);
let field = schema_builder.add_date_field("field", date_options);
let schema = schema_builder.build();
#[bench]
fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) {
let permutation = generate_permutation_gcd();
let n = permutation.len();
let column: Arc<dyn Column<u64>> = encode_decode_fast_field(&permutation);
b.iter(|| {
let mut a = 0u64;
for i in 0..n as u64 {
a += column.get_val(i);
}
a
});
}
let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)];
#[bench]
fn bench_intfastfield_scan_all_vec(b: &mut Bencher) {
let permutation = generate_permutation();
b.iter(|| {
let mut a = 0u64;
for i in 0..permutation.len() {
a += permutation[i as usize] as u64;
}
a
});
let directory = get_index(&docs, &schema, &[codec_type])?;
let path = Path::new("test");
let file = directory.open_read(path).unwrap();
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let len = file.len();
let test_fastfield = open::<DateTime>(file.read_bytes()?)?;
assert_eq!(test_fastfield.get_val(0), time1.truncate(precision));
assert_eq!(test_fastfield.get_val(1), time2.truncate(precision));
assert_eq!(test_fastfield.get_val(2), time3.truncate(precision));
Ok(len)
}
}

View File

@@ -341,6 +341,7 @@ mod tests {
}
proptest! {
#![proptest_config(proptest::prelude::ProptestConfig::with_cases(5))]
#[test]
fn test_multivalued_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) {
assert!(test_multivalued_no_panic(&ops[..]).is_ok());

View File

@@ -1,10 +1,11 @@
use std::io;
use fastfield_codecs::MonotonicallyMappableToU64;
use fnv::FnvHashMap;
use tantivy_bitpacker::minmax;
use crate::fastfield::serializer::BitpackedSerializerLegacy;
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType, FastValue};
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
use crate::schema::{Document, Field, Value};

View File

@@ -1,62 +0,0 @@
use std::sync::Arc;
use common::BinarySerializable;
use fastfield_codecs::bitpacked::BitpackedCodec;
use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec;
use fastfield_codecs::linear::LinearCodec;
use fastfield_codecs::{monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType};
use super::gcd::open_gcd_from_bytes;
use super::FastValue;
use crate::directory::OwnedBytes;
use crate::error::DataCorruption;
fn open_codec_from_bytes<C: FastFieldCodec, Item: FastValue>(
bytes: OwnedBytes,
) -> crate::Result<Arc<dyn Column<Item>>> {
let reader = C::open_from_bytes(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
fn open_codec_with_gcd<C: FastFieldCodec, Item: FastValue>(
bytes: OwnedBytes,
) -> crate::Result<Arc<dyn Column<Item>>> {
let reader = open_gcd_from_bytes::<C>(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, Item::from_u64)))
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
fn open_from_id<Item: FastValue>(
mut bytes: OwnedBytes,
codec_type: FastFieldCodecType,
) -> crate::Result<Arc<dyn Column<Item>>> {
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_from_bytes::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_from_bytes::<LinearCodec, _>(bytes),
FastFieldCodecType::BlockwiseLinear => {
open_codec_from_bytes::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
match codec_type {
FastFieldCodecType::Bitpacked => open_codec_with_gcd::<BitpackedCodec, _>(bytes),
FastFieldCodecType::Linear => open_codec_with_gcd::<LinearCodec, _>(bytes),
FastFieldCodecType::BlockwiseLinear => {
open_codec_with_gcd::<BlockwiseLinearCodec, _>(bytes)
}
FastFieldCodecType::Gcd => Err(DataCorruption::comment_only(
"Gcd codec wrapped into another gcd codec. This combination is not allowed.",
)
.into()),
}
}
}
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open_fast_field<Item: FastValue>(
mut bytes: OwnedBytes,
) -> crate::Result<Arc<dyn Column<Item>>> {
let codec_type = FastFieldCodecType::deserialize(&mut bytes)?;
open_from_id(bytes, codec_type)
}

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
use fastfield_codecs::Column;
use fastfield_codecs::{open, Column};
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::reader::open_fast_field;
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
};
@@ -116,7 +115,8 @@ impl FastFieldReaders {
) -> crate::Result<Arc<dyn Column<TFastValue>>> {
let fast_field_slice = self.fast_field_data(field, index)?;
let bytes = fast_field_slice.read_bytes()?;
open_fast_field(bytes)
let column = fastfield_codecs::open(bytes)?;
Ok(column)
}
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
@@ -248,7 +248,7 @@ impl FastFieldReaders {
}
let fast_field_idx_file = self.fast_field_data(field, 0)?;
let fast_field_idx_bytes = fast_field_idx_file.read_bytes()?;
let idx_reader = open_fast_field(fast_field_idx_bytes)?;
let idx_reader = open(fast_field_idx_bytes)?;
let data = self.fast_field_data(field, 1)?;
BytesFastFieldReader::open(idx_reader, data)
} else {

View File

@@ -1,17 +1,11 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use common::{BinarySerializable, CountingWriter};
use fastdivide::DividerU64;
pub use fastfield_codecs::bitpacked::{BitpackedCodec, BitpackedSerializerLegacy};
use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec;
use fastfield_codecs::linear::LinearCodec;
use fastfield_codecs::{monotonic_map_column, FastFieldCodecType};
pub use fastfield_codecs::{Column, FastFieldCodec, FastFieldStats};
use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES};
use super::{find_gcd, ALL_CODECS, GCD_DEFAULT};
use crate::directory::{CompositeWrite, WritePtr};
use crate::fastfield::gcd::write_gcd_header;
use crate::schema::Field;
/// `CompositeFastFieldSerializer` is in charge of serializing
@@ -36,68 +30,34 @@ use crate::schema::Field;
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
codec_enable_checker: FastFieldCodecEnableCheck,
}
#[derive(Debug, Clone)]
pub struct FastFieldCodecEnableCheck {
enabled_codecs: Vec<FastFieldCodecType>,
}
impl FastFieldCodecEnableCheck {
fn allow_all() -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: ALL_CODECS.to_vec(),
}
}
fn is_enabled(&self, code_type: FastFieldCodecType) -> bool {
self.enabled_codecs.contains(&code_type)
}
}
impl From<FastFieldCodecType> for FastFieldCodecEnableCheck {
fn from(code_type: FastFieldCodecType) -> Self {
FastFieldCodecEnableCheck {
enabled_codecs: vec![code_type],
}
}
}
// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait
// https://github.com/rust-lang/rust/pull/86176
fn codec_estimation<C: FastFieldCodec, D: Column>(
fastfield_accessor: &D,
estimations: &mut Vec<(f32, FastFieldCodecType)>,
) {
if let Some(ratio) = C::estimate(fastfield_accessor) {
estimations.push((ratio, C::CODEC_TYPE));
}
codec_types: Vec<FastFieldCodecType>,
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all())
Self::from_write_with_codec(write, &ALL_CODEC_TYPES)
}
/// Constructor
pub fn from_write_with_codec(
write: WritePtr,
codec_enable_checker: FastFieldCodecEnableCheck,
codec_types: &[FastFieldCodecType],
) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer {
composite_write,
codec_enable_checker,
codec_types: codec_types.to_vec(),
})
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field(
pub fn create_auto_detect_u64_fast_field<T: MonotonicallyMappableToU64>(
&mut self,
field: Field,
fastfield_accessor: impl Column,
fastfield_accessor: impl Column<T>,
) -> io::Result<()> {
self.create_auto_detect_u64_fast_field_with_idx(field, fastfield_accessor, 0)
}
@@ -114,102 +74,14 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx(
pub fn create_auto_detect_u64_fast_field_with_idx<T: MonotonicallyMappableToU64>(
&mut self,
field: Field,
fastfield_accessor: impl Column,
fastfield_accessor: impl Column<T>,
idx: usize,
) -> io::Result<()> {
let min_value = fastfield_accessor.min_value();
let field_write = self.composite_write.for_field_with_idx(field, idx);
let gcd = find_gcd(fastfield_accessor.iter().map(|val| val - min_value))
.map(NonZeroU64::get)
.unwrap_or(GCD_DEFAULT);
if gcd == 1 {
return Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
fastfield_accessor,
);
}
Self::write_header(field_write, FastFieldCodecType::Gcd)?;
let base_value = fastfield_accessor.min_value();
let gcd_divider = DividerU64::divide_by(gcd);
let divided_fastfield_accessor = monotonic_map_column(fastfield_accessor, |val: u64| {
gcd_divider.divide(val - base_value)
});
let num_vals = divided_fastfield_accessor.num_vals();
Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
divided_fastfield_accessor,
)?;
write_gcd_header(field_write, base_value, gcd, num_vals)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx_gcd<W: Write>(
codec_enable_checker: FastFieldCodecEnableCheck,
field: Field,
field_write: &mut CountingWriter<W>,
fastfield_accessor: impl Column,
) -> io::Result<()> {
let mut estimations = vec![];
if codec_enable_checker.is_enabled(FastFieldCodecType::Bitpacked) {
codec_estimation::<BitpackedCodec, _>(&fastfield_accessor, &mut estimations);
}
if codec_enable_checker.is_enabled(FastFieldCodecType::Linear) {
codec_estimation::<LinearCodec, _>(&fastfield_accessor, &mut estimations);
}
if codec_enable_checker.is_enabled(FastFieldCodecType::BlockwiseLinear) {
codec_estimation::<BlockwiseLinearCodec, _>(&fastfield_accessor, &mut estimations);
}
if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan())
{
warn!(
"broken estimation for fast field codec {:?}",
broken_estimation.1
);
}
// removing nan values for codecs with broken calculations, and max values which disables
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
let (_ratio, codec_type) = estimations[0];
debug!("choosing fast field codec {codec_type:?} for field_id {field:?}"); // todo print actual field name
Self::write_header(field_write, codec_type)?;
match codec_type {
FastFieldCodecType::Bitpacked => {
BitpackedCodec::serialize(field_write, &fastfield_accessor)?;
}
FastFieldCodecType::Linear => {
LinearCodec::serialize(field_write, &fastfield_accessor)?;
}
FastFieldCodecType::BlockwiseLinear => {
BlockwiseLinearCodec::serialize(field_write, &fastfield_accessor)?;
}
FastFieldCodecType::Gcd => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"GCD codec not supported.",
));
}
}
field_write.flush()?;
fastfield_codecs::serialize(fastfield_accessor, field_write, &self.codec_types)?;
Ok(())
}

View File

@@ -2,13 +2,13 @@ use std::collections::HashMap;
use std::io;
use common;
use fastfield_codecs::Column;
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fnv::FnvHashMap;
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::serializer::FastFieldStats;
use super::{FastFieldType, FastValue};
use super::FastFieldType;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;

View File

@@ -1,3 +1,4 @@
use fastfield_codecs::MonotonicallyMappableToU64;
use fnv::FnvHashMap;
use murmurhash32::murmurhash2;

View File

@@ -1,7 +1,9 @@
use fastfield_codecs::MonotonicallyMappableToU64;
use super::doc_id_mapping::{get_doc_id_mapping_from_field, DocIdMapping};
use super::operation::AddOperation;
use crate::core::Segment;
use crate::fastfield::{FastFieldsWriter, FastValue as _};
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::json_term_writer::index_json_values;
use crate::indexer::segment_serializer::SegmentSerializer;

View File

@@ -253,7 +253,7 @@ where B: AsRef<[u8]>
}
value_bytes.copy_from_slice(self.value_bytes());
let value_u64 = u64::from_be_bytes(value_bytes);
Some(FastValue::from_u64(value_u64))
Some(T::from_u64(value_u64))
}
/// Returns the `i64` value stored in a term.
@@ -362,7 +362,7 @@ fn as_str(value_bytes: &[u8]) -> Option<&str> {
fn get_fast_type<T: FastValue>(bytes: &[u8]) -> Option<T> {
let value_u64 = u64::from_be_bytes(bytes.try_into().ok()?);
Some(FastValue::from_u64(value_u64))
Some(T::from_u64(value_u64))
}
/// Returns the json path (without non-human friendly separators, the type of the value, and the