diff --git a/fastfield_codecs/Cargo.toml b/fastfield_codecs/Cargo.toml index e62455b1a..cddee15de 100644 --- a/fastfield_codecs/Cargo.toml +++ b/fastfield_codecs/Cargo.toml @@ -14,6 +14,8 @@ tantivy-bitpacker = { version="0.2", path = "../bitpacker/" } ownedbytes = { version = "0.3.0", path = "../ownedbytes" } prettytable-rs = {version="0.9.0", optional= true} rand = {version="0.8.3", optional= true} +fastdivide = "0.4" +log = "0.4" [dev-dependencies] more-asserts = "0.3.0" @@ -23,4 +25,5 @@ rand = "0.8.3" [features] bin = ["prettytable-rs", "rand"] default = ["bin"] +unstable = [] diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 8b54ccf76..b28b8af41 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -27,7 +27,7 @@ mod tests { } fn bench_get(b: &mut Bencher, data: &[u64]) { let mut bytes = vec![]; - Codec::serialize(&mut bytes, &data).unwrap(); + Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap(); let reader = Codec::open_from_bytes(OwnedBytes::new(bytes)).unwrap(); b.iter(|| { let mut sum = 0u64; @@ -43,7 +43,7 @@ mod tests { let mut bytes = Vec::new(); b.iter(|| { bytes.clear(); - Codec::serialize(&mut bytes, &data).unwrap(); + Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap(); }); } diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index 0674d4d76..de81542ef 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -81,8 +81,11 @@ impl<'a, T: Copy + PartialOrd> Column for VecColumn<'a, T> { } } -impl<'a, T: Copy + Ord + Default> From<&'a [T]> for VecColumn<'a, T> { - fn from(values: &'a [T]) -> Self { +impl<'a, T: Copy + Ord + Default, V> From<&'a V> for VecColumn<'a, T> +where V: AsRef<[T]> + ?Sized +{ + fn from(values: &'a V) -> Self { + let values = values.as_ref(); let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default(); Self { values, diff --git a/fastfield_codecs/src/gcd.rs b/fastfield_codecs/src/gcd.rs new file mode 100644 index 000000000..7cec65faa --- /dev/null +++ b/fastfield_codecs/src/gcd.rs @@ -0,0 +1,207 @@ +use std::io::{self, Write}; +use std::num::NonZeroU64; + +use common::BinarySerializable; +use fastdivide::DividerU64; + +#[derive(Debug, Clone, Copy)] +pub struct GCDParams { + pub gcd: u64, + pub min_value: u64, + pub num_vals: u64, +} + +impl BinarySerializable for GCDParams { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.gcd.serialize(writer)?; + self.min_value.serialize(writer)?; + self.num_vals.serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let gcd: u64 = u64::deserialize(reader)?; + let min_value: u64 = u64::deserialize(reader)?; + let num_vals: u64 = u64::deserialize(reader)?; + Ok(Self { + gcd, + min_value, + num_vals, + }) + } +} + +/// Compute the gcd of two non null numbers. +/// +/// It is recommended, but not required, to feed values such that `large >= small`. +fn compute_gcd(mut large: NonZeroU64, mut small: NonZeroU64) -> NonZeroU64 { + loop { + let rem: u64 = large.get() % small; + if let Some(new_small) = NonZeroU64::new(rem) { + (large, small) = (small, new_small); + } else { + return small; + } + } +} + +// Find GCD for iterator of numbers +pub fn find_gcd(numbers: impl Iterator) -> Option { + let mut numbers = numbers.flat_map(NonZeroU64::new); + let mut gcd: NonZeroU64 = numbers.next()?; + if gcd.get() == 1 { + return Some(gcd); + } + + let mut gcd_divider = DividerU64::divide_by(gcd.get()); + for val in numbers { + let remainder = val.get() - (gcd_divider.divide(val.get())) * gcd.get(); + if remainder == 0 { + continue; + } + gcd = compute_gcd(val, gcd); + if gcd.get() == 1 { + return Some(gcd); + } + + gcd_divider = DividerU64::divide_by(gcd.get()); + } + Some(gcd) +} + +#[cfg(test)] +mod tests { + use std::io; + use std::num::NonZeroU64; + + use ownedbytes::OwnedBytes; + + use crate::gcd::{compute_gcd, find_gcd}; + use crate::{FastFieldCodecType, VecColumn}; + + fn test_fastfield_gcd_i64_with_codec( + codec_type: FastFieldCodecType, + num_vals: usize, + ) -> io::Result<()> { + let mut vals: Vec = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect(); + let mut buffer: Vec = Vec::new(); + crate::serialize( + VecColumn::from(&vals), + &mut buffer, + &[codec_type, FastFieldCodecType::Gcd], + )?; + let buffer = OwnedBytes::new(buffer); + let column = crate::open::(buffer.clone())?; + assert_eq!(column.get_val(0), -4000i64); + assert_eq!(column.get_val(1), -3000i64); + assert_eq!(column.get_val(2), -2000i64); + assert_eq!(column.max_value(), (num_vals as i64 - 5) * 1000); + assert_eq!(column.min_value(), -4000i64); + + // Can't apply gcd + let mut buffer_without_gcd = Vec::new(); + vals.pop(); + vals.push(1001i64); + crate::serialize( + VecColumn::from(&vals), + &mut buffer_without_gcd, + &[codec_type], + )?; + let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + assert!(buffer_without_gcd.len() > buffer.len()); + + Ok(()) + } + + #[test] + fn test_fastfield_gcd_i64() -> io::Result<()> { + for &codec_type in &[ + FastFieldCodecType::Bitpacked, + FastFieldCodecType::BlockwiseLinear, + FastFieldCodecType::Linear, + ] { + test_fastfield_gcd_i64_with_codec(codec_type, 5500)?; + } + Ok(()) + } + + fn test_fastfield_gcd_u64_with_codec( + codec_type: FastFieldCodecType, + num_vals: usize, + ) -> io::Result<()> { + let mut vals: Vec = (1..=num_vals).map(|i| i as u64 * 1000u64).collect(); + let mut buffer: Vec = Vec::new(); + crate::serialize( + VecColumn::from(&vals), + &mut buffer, + &[codec_type, FastFieldCodecType::Gcd], + )?; + let buffer = OwnedBytes::new(buffer); + let column = crate::open::(buffer.clone())?; + assert_eq!(column.get_val(0), 1000u64); + assert_eq!(column.get_val(1), 2000u64); + assert_eq!(column.get_val(2), 3000u64); + assert_eq!(column.max_value(), num_vals as u64 * 1000); + assert_eq!(column.min_value(), 1000u64); + + // Can't apply gcd + let mut buffer_without_gcd = Vec::new(); + vals.pop(); + vals.push(1001u64); + crate::serialize( + VecColumn::from(&vals), + &mut buffer_without_gcd, + &[codec_type], + )?; + let buffer_without_gcd = OwnedBytes::new(buffer_without_gcd); + assert!(buffer_without_gcd.len() > buffer.len()); + Ok(()) + } + + #[test] + fn test_fastfield_gcd_u64() -> io::Result<()> { + for &codec_type in &[ + FastFieldCodecType::Bitpacked, + FastFieldCodecType::BlockwiseLinear, + FastFieldCodecType::Linear, + ] { + test_fastfield_gcd_u64_with_codec(codec_type, 5500)?; + } + Ok(()) + } + + #[test] + pub fn test_fastfield2() { + let test_fastfield = crate::serialize_and_load(&[100u64, 200u64, 300u64]); + assert_eq!(test_fastfield.get_val(0), 100); + assert_eq!(test_fastfield.get_val(1), 200); + assert_eq!(test_fastfield.get_val(2), 300); + } + + #[test] + fn test_compute_gcd() { + let test_compute_gcd_aux = |large, small, expected| { + let large = NonZeroU64::new(large).unwrap(); + let small = NonZeroU64::new(small).unwrap(); + let expected = NonZeroU64::new(expected).unwrap(); + assert_eq!(compute_gcd(small, large), expected); + assert_eq!(compute_gcd(large, small), expected); + }; + test_compute_gcd_aux(1, 4, 1); + test_compute_gcd_aux(2, 4, 2); + test_compute_gcd_aux(10, 25, 5); + test_compute_gcd_aux(25, 25, 25); + } + + #[test] + fn find_gcd_test() { + assert_eq!(find_gcd([0].into_iter()), None); + assert_eq!(find_gcd([0, 10].into_iter()), NonZeroU64::new(10)); + assert_eq!(find_gcd([10, 0].into_iter()), NonZeroU64::new(10)); + assert_eq!(find_gcd([].into_iter()), None); + assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), NonZeroU64::new(5)); + assert_eq!(find_gcd([15, 16, 10].into_iter()), NonZeroU64::new(1)); + assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), NonZeroU64::new(5)); + assert_eq!(find_gcd([0, 0].into_iter()), None); + } +} diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 67ed64b8b..ed219ac1e 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -1,7 +1,12 @@ +#![cfg_attr(all(feature = "unstable", test), feature(test))] + #[cfg(test)] #[macro_use] extern crate more_asserts; +#[cfg(all(test, feature = "unstable"))] +extern crate test; + use std::io; use std::io::Write; @@ -13,8 +18,11 @@ pub mod blockwise_linear; pub mod linear; mod column; +mod gcd; +mod serialize; pub use self::column::{monotonic_map_column, Column, VecColumn}; +pub use self::serialize::{open, serialize, serialize_and_load}; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] #[repr(u8)] @@ -54,6 +62,67 @@ impl FastFieldCodecType { } } +pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy { + /// Converts a value to u64. + /// + /// Internally all fast field values are encoded as u64. + fn to_u64(self) -> u64; + + /// Converts a value from u64 + /// + /// Internally all fast field values are encoded as u64. + /// **Note: To be used for converting encoded Term, Posting values.** + fn from_u64(val: u64) -> Self; +} + +impl MonotonicallyMappableToU64 for u64 { + fn to_u64(self) -> u64 { + self + } + + fn from_u64(val: u64) -> Self { + val + } +} + +impl MonotonicallyMappableToU64 for i64 { + #[inline(always)] + fn to_u64(self) -> u64 { + common::i64_to_u64(self) + } + + #[inline(always)] + fn from_u64(val: u64) -> Self { + common::u64_to_i64(val) + } +} + +impl MonotonicallyMappableToU64 for bool { + #[inline(always)] + fn to_u64(self) -> u64 { + if self { + 1 + } else { + 0 + } + } + + #[inline(always)] + fn from_u64(val: u64) -> Self { + val > 0 + } +} + +impl MonotonicallyMappableToU64 for f64 { + fn to_u64(self) -> u64 { + common::f64_to_u64(self) + } + + fn from_u64(val: u64) -> Self { + common::u64_to_f64(val) + } +} + /// The FastFieldSerializerEstimate trait is required on all variants /// of fast field compressions, to decide which one to choose. pub trait FastFieldCodec: 'static { @@ -82,6 +151,13 @@ pub trait FastFieldCodec: 'static { fn estimate(fastfield_accessor: &impl Column) -> Option; } +pub const ALL_CODEC_TYPES: [FastFieldCodecType; 4] = [ + FastFieldCodecType::Bitpacked, + FastFieldCodecType::BlockwiseLinear, + FastFieldCodecType::Gcd, + FastFieldCodecType::Linear, +]; + #[derive(Debug, Clone)] /// Statistics are used in codec detection and stored in the fast field footer. pub struct FastFieldStats { @@ -255,3 +331,121 @@ mod tests { assert_eq!(count_codec, 4); } } + +#[cfg(all(test, feature = "unstable"))] +mod bench { + use std::sync::Arc; + + use rand::prelude::*; + use test::{self, Bencher}; + + use crate::Column; + + // Warning: this generates the same permutation at each call + fn generate_permutation() -> Vec { + let mut permutation: Vec = (0u64..100_000u64).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + // Warning: this generates the same permutation at each call + fn generate_permutation_gcd() -> Vec { + let mut permutation: Vec = (1u64..100_000u64).map(|el| el * 1000).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + #[bench] + fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + b.iter(|| { + let mut a = 0u64; + for _ in 0..n { + a = permutation[a as usize]; + } + a + }); + } + + #[bench] + fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = crate::serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for _ in 0..n { + a = column.get_val(a as u64); + } + a + }); + } + + #[bench] + fn bench_intfastfield_stride7_vec(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + b.iter(|| { + let mut a = 0u64; + for i in (0..n / 7).map(|val| val * 7) { + a += permutation[i as usize]; + } + a + }); + } + + #[bench] + fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = crate::serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in (0..n / 7).map(|val| val * 7) { + a += column.get_val(i as u64); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = crate::serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in 0u64..n as u64 { + a += column.get_val(i); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) { + let permutation = generate_permutation_gcd(); + let n = permutation.len(); + let column: Arc> = crate::serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in 0..n as u64 { + a += column.get_val(i); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_vec(b: &mut Bencher) { + let permutation = generate_permutation(); + b.iter(|| { + let mut a = 0u64; + for i in 0..permutation.len() { + a += permutation[i as usize] as u64; + } + a + }); + } +} diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs new file mode 100644 index 000000000..0deb36150 --- /dev/null +++ b/fastfield_codecs/src/serialize.rs @@ -0,0 +1,233 @@ +// Copyright (C) 2022 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::io; +use std::num::NonZeroU64; +use std::sync::Arc; + +use common::BinarySerializable; +use fastdivide::DividerU64; +use log::warn; +use ownedbytes::OwnedBytes; + +use crate::bitpacked::BitpackedCodec; +use crate::blockwise_linear::BlockwiseLinearCodec; +use crate::gcd::{find_gcd, GCDParams}; +use crate::linear::LinearCodec; +use crate::{ + monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, + VecColumn, ALL_CODEC_TYPES, +}; + +// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait +// https://github.com/rust-lang/rust/pull/86176 +fn codec_estimation( + fastfield_accessor: &D, + estimations: &mut Vec<(f32, FastFieldCodecType)>, +) { + if let Some(ratio) = C::estimate(fastfield_accessor) { + estimations.push((ratio, C::CODEC_TYPE)); + } +} + +fn write_header(codec_type: FastFieldCodecType, output: &mut W) -> io::Result<()> { + codec_type.to_code().serialize(output)?; + Ok(()) +} + +fn gcd_params(column: &impl Column) -> Option { + let min_value = column.min_value(); + let gcd = find_gcd(column.iter().map(|val| val - min_value)).map(NonZeroU64::get)?; + if gcd == 1 { + return None; + } + Some(GCDParams { + gcd, + min_value, + num_vals: column.num_vals(), + }) +} + +/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. +pub fn open( + mut bytes: OwnedBytes, +) -> io::Result>> { + let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; + open_from_id(bytes, codec_type) +} + +fn open_codec_from_bytes( + bytes: OwnedBytes, +) -> io::Result>> { + let reader = C::open_from_bytes(bytes)?; + Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) +} + +pub fn open_gcd_from_bytes( + bytes: OwnedBytes, +) -> io::Result { + let footer_offset = bytes.len() - 24; + let (body, mut footer) = bytes.split(footer_offset); + let gcd_params = GCDParams::deserialize(&mut footer)?; + let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val; + let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?; + Ok(monotonic_map_column(reader, gcd_remap)) +} + +fn open_codec_with_gcd( + bytes: OwnedBytes, +) -> io::Result>> { + let reader = open_gcd_from_bytes::(bytes)?; + Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) +} + +/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. +fn open_from_id( + mut bytes: OwnedBytes, + codec_type: FastFieldCodecType, +) -> io::Result>> { + match codec_type { + FastFieldCodecType::Bitpacked => open_codec_from_bytes::(bytes), + FastFieldCodecType::Linear => open_codec_from_bytes::(bytes), + FastFieldCodecType::BlockwiseLinear => { + open_codec_from_bytes::(bytes) + } + FastFieldCodecType::Gcd => { + let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; + match codec_type { + FastFieldCodecType::Bitpacked => open_codec_with_gcd::(bytes), + FastFieldCodecType::Linear => open_codec_with_gcd::(bytes), + FastFieldCodecType::BlockwiseLinear => { + open_codec_with_gcd::(bytes) + } + FastFieldCodecType::Gcd => Err(io::Error::new( + io::ErrorKind::InvalidData, + "Gcd codec wrapped into another gcd codec. This combination is not allowed.", + )), + } + } + } +} + +pub fn serialize( + typed_column: impl Column, + output: &mut impl io::Write, + codecs: &[FastFieldCodecType], +) -> io::Result<()> { + let column = monotonic_map_column(typed_column, T::to_u64); + let gcd_params_opt = if codecs.contains(&FastFieldCodecType::Gcd) { + gcd_params(&column) + } else { + None + }; + + let gcd_params = if let Some(gcd_params) = gcd_params_opt { + gcd_params + } else { + return serialize_without_gcd(column, output, codecs); + }; + + write_header(FastFieldCodecType::Gcd, output)?; + let base_value = column.min_value(); + let gcd_divider = DividerU64::divide_by(gcd_params.gcd); + let divided_fastfield_accessor = + monotonic_map_column(column, |val: u64| gcd_divider.divide(val - base_value)); + + serialize_without_gcd(divided_fastfield_accessor, output, codecs)?; + + gcd_params.serialize(output)?; + Ok(()) +} + +fn serialize_without_gcd( + column: impl Column, + output: &mut impl io::Write, + codecs: &[FastFieldCodecType], +) -> io::Result<()> { + let mut estimations = Vec::new(); + for &codec in codecs { + if codec == FastFieldCodecType::Gcd { + continue; + } + match codec { + FastFieldCodecType::Bitpacked => { + codec_estimation::(&column, &mut estimations); + } + FastFieldCodecType::Linear => { + codec_estimation::(&column, &mut estimations); + } + FastFieldCodecType::BlockwiseLinear => { + codec_estimation::(&column, &mut estimations); + } + FastFieldCodecType::Gcd => {} + } + } + if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) { + warn!( + "broken estimation for fast field codec {:?}", + broken_estimation.1 + ); + } + // removing nan values for codecs with broken calculations, and max values which disables + // codecs + estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX); + estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); + let (_ratio, codec_type) = estimations[0]; + + write_header(codec_type, output)?; + match codec_type { + FastFieldCodecType::Bitpacked => { + BitpackedCodec::serialize(output, &column)?; + } + FastFieldCodecType::Linear => { + LinearCodec::serialize(output, &column)?; + } + FastFieldCodecType::BlockwiseLinear => { + BlockwiseLinearCodec::serialize(output, &column)?; + } + FastFieldCodecType::Gcd => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "GCD codec not supported.", + )); + } + } + output.flush()?; + Ok(()) +} + +pub fn serialize_and_load( + column: &[T], +) -> Arc> { + let mut buffer = Vec::new(); + super::serialize(VecColumn::from(column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); + super::open(OwnedBytes::new(buffer)).unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialize_deserialize() { + let original = [1u64, 5u64, 10u64]; + let restored: Vec = serialize_and_load(&original[..]).iter().collect(); + assert_eq!(&restored, &original[..]); + } +} diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index d3e7ec71a..51101f121 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -423,12 +423,13 @@ pub(crate) fn range_to_key(range: &Range, field_type: &Type) -> Key { #[cfg(test)] mod tests { + use fastfield_codecs::MonotonicallyMappableToU64; + use super::*; use crate::aggregation::agg_req::{ Aggregation, Aggregations, BucketAggregation, BucketAggregationType, }; use crate::aggregation::tests::{exec_request_with_query, get_test_index_with_num_docs}; - use crate::fastfield::FastValue; pub fn get_collector_from_ranges( ranges: Vec, diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 1d6e35383..c0f724d00 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -161,7 +161,6 @@ mod collector; pub mod intermediate_agg_result; pub mod metric; mod segment_agg_result; - use std::collections::HashMap; use std::fmt::Display; @@ -169,10 +168,10 @@ pub use collector::{ AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, MAX_BUCKET_COUNT, }; +use fastfield_codecs::MonotonicallyMappableToU64; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use crate::fastfield::FastValue; use crate::schema::Type; /// Represents an associative array `(key => values)` in a very efficient manner. diff --git a/src/fastfield/gcd.rs b/src/fastfield/gcd.rs deleted file mode 100644 index abcf535f4..000000000 --- a/src/fastfield/gcd.rs +++ /dev/null @@ -1,328 +0,0 @@ -use std::io::{self, Write}; -use std::num::NonZeroU64; - -use common::BinarySerializable; -use fastdivide::DividerU64; -use fastfield_codecs::{monotonic_map_column, Column, FastFieldCodec}; -use ownedbytes::OwnedBytes; - -pub const GCD_DEFAULT: u64 = 1; - -#[derive(Debug, Clone, Copy)] -struct GCDParams { - gcd: u64, - min_value: u64, - num_vals: u64, -} - -impl BinarySerializable for GCDParams { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.gcd.serialize(writer)?; - self.min_value.serialize(writer)?; - self.num_vals.serialize(writer)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let gcd: u64 = u64::deserialize(reader)?; - let min_value: u64 = u64::deserialize(reader)?; - let num_vals: u64 = u64::deserialize(reader)?; - Ok(Self { - gcd, - min_value, - num_vals, - }) - } -} - -pub fn open_gcd_from_bytes( - bytes: OwnedBytes, -) -> io::Result { - let footer_offset = bytes.len() - 24; - let (body, mut footer) = bytes.split(footer_offset); - let gcd_params = GCDParams::deserialize(&mut footer)?; - let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val; - let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?; - Ok(monotonic_map_column(reader, gcd_remap)) -} - -pub fn write_gcd_header( - field_write: &mut W, - min_value: u64, - gcd: u64, - num_vals: u64, -) -> io::Result<()> { - gcd.serialize(field_write)?; - min_value.serialize(field_write)?; - num_vals.serialize(field_write)?; - Ok(()) -} - -/// Compute the gcd of two non null numbers. -/// -/// It is recommended, but not required, to feed values such that `large >= small`. -fn compute_gcd(mut large: NonZeroU64, mut small: NonZeroU64) -> NonZeroU64 { - loop { - let rem: u64 = large.get() % small; - if let Some(new_small) = NonZeroU64::new(rem) { - (large, small) = (small, new_small); - } else { - return small; - } - } -} - -// Find GCD for iterator of numbers -pub fn find_gcd(numbers: impl Iterator) -> Option { - let mut numbers = numbers.flat_map(NonZeroU64::new); - let mut gcd: NonZeroU64 = numbers.next()?; - if gcd.get() == 1 { - return Some(gcd); - } - - let mut gcd_divider = DividerU64::divide_by(gcd.get()); - for val in numbers { - let remainder = val.get() - (gcd_divider.divide(val.get())) * gcd.get(); - if remainder == 0 { - continue; - } - gcd = compute_gcd(val, gcd); - if gcd.get() == 1 { - return Some(gcd); - } - - gcd_divider = DividerU64::divide_by(gcd.get()); - } - Some(gcd) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::num::NonZeroU64; - use std::path::Path; - use std::sync::Arc; - use std::time::{Duration, SystemTime}; - - use common::HasLen; - use fastfield_codecs::Column; - - use crate::directory::{CompositeFile, RamDirectory, WritePtr}; - use crate::fastfield::gcd::compute_gcd; - use crate::fastfield::reader::open_fast_field; - use crate::fastfield::serializer::FastFieldCodecEnableCheck; - use crate::fastfield::tests::{encode_decode_fast_field, FIELD, FIELDI64, SCHEMA, SCHEMAI64}; - use crate::fastfield::{ - find_gcd, CompositeFastFieldSerializer, FastFieldCodecType, FastFieldsWriter, ALL_CODECS, - }; - use crate::schema::{Cardinality, Schema}; - use crate::{DateOptions, DatePrecision, DateTime, Directory}; - - fn get_index( - docs: &[crate::Document], - schema: &Schema, - codec_enable_checker: FastFieldCodecEnableCheck, - ) -> crate::Result { - let directory: RamDirectory = RamDirectory::create(); - { - let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); - let mut serializer = - CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker) - .unwrap(); - let mut fast_field_writers = FastFieldsWriter::from_schema(schema); - for doc in docs { - fast_field_writers.add_document(doc); - } - fast_field_writers - .serialize(&mut serializer, &HashMap::new(), None) - .unwrap(); - serializer.close().unwrap(); - } - Ok(directory) - } - - fn test_fastfield_gcd_i64_with_codec( - code_type: FastFieldCodecType, - num_vals: usize, - ) -> crate::Result<()> { - let path = Path::new("test"); - let mut docs = vec![]; - for i in 1..=num_vals { - let val = (i as i64 - 5) * 1000i64; - docs.push(doc!(*FIELDI64=>val)); - } - let directory = get_index(&docs, &SCHEMAI64, code_type.into())?; - let file = directory.open_read(path).unwrap(); - let composite_file = CompositeFile::open(&file)?; - let file = composite_file.open_read(*FIELD).unwrap(); - let fast_field_reader: Arc> = open_fast_field(file.read_bytes()?)?; - assert_eq!(fast_field_reader.get_val(0), -4000i64); - assert_eq!(fast_field_reader.get_val(1), -3000i64); - assert_eq!(fast_field_reader.get_val(2), -2000i64); - assert_eq!(fast_field_reader.max_value(), (num_vals as i64 - 5) * 1000); - assert_eq!(fast_field_reader.min_value(), -4000i64); - let file = directory.open_read(path).unwrap(); - - // Can't apply gcd - let path = Path::new("test"); - docs.pop(); - docs.push(doc!(*FIELDI64=>2001i64)); - let directory = get_index(&docs, &SCHEMAI64, code_type.into())?; - let file2 = directory.open_read(path).unwrap(); - assert!(file2.len() > file.len()); - - Ok(()) - } - - #[test] - fn test_fastfield_gcd_i64() -> crate::Result<()> { - for &code_type in ALL_CODECS { - test_fastfield_gcd_i64_with_codec(code_type, 5500)?; - } - Ok(()) - } - - fn test_fastfield_gcd_u64_with_codec( - code_type: FastFieldCodecType, - num_vals: usize, - ) -> crate::Result<()> { - let path = Path::new("test"); - let mut docs = vec![]; - for i in 1..=num_vals { - let val = i as u64 * 1000u64; - docs.push(doc!(*FIELD=>val)); - } - let directory = get_index(&docs, &SCHEMA, code_type.into())?; - let file = directory.open_read(path).unwrap(); - let composite_file = CompositeFile::open(&file)?; - let file = composite_file.open_read(*FIELD).unwrap(); - let fast_field_reader = open_fast_field::(file.read_bytes()?)?; - assert_eq!(fast_field_reader.get_val(0), 1000u64); - assert_eq!(fast_field_reader.get_val(1), 2000u64); - assert_eq!(fast_field_reader.get_val(2), 3000u64); - assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000); - assert_eq!(fast_field_reader.min_value(), 1000u64); - let file = directory.open_read(path).unwrap(); - - // Can't apply gcd - let path = Path::new("test"); - docs.pop(); - docs.push(doc!(*FIELDI64=>2001u64)); - let directory = get_index(&docs, &SCHEMA, code_type.into())?; - let file2 = directory.open_read(path).unwrap(); - assert!(file2.len() > file.len()); - - Ok(()) - } - - #[test] - fn test_fastfield_gcd_u64() -> crate::Result<()> { - for &code_type in ALL_CODECS { - test_fastfield_gcd_u64_with_codec(code_type, 5500)?; - } - Ok(()) - } - - #[test] - pub fn test_fastfield2() { - let test_fastfield = encode_decode_fast_field(&[100u64, 200u64, 300u64]); - assert_eq!(test_fastfield.get_val(0), 100); - assert_eq!(test_fastfield.get_val(1), 200); - assert_eq!(test_fastfield.get_val(2), 300); - } - - #[test] - pub fn test_gcd_date() -> crate::Result<()> { - let size_prec_sec = - test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?; - let size_prec_micro = - test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?; - assert!(size_prec_sec < size_prec_micro); - - let size_prec_sec = - test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?; - let size_prec_micro = - test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?; - assert!(size_prec_sec < size_prec_micro); - - Ok(()) - } - - fn test_gcd_date_with_codec( - codec_type: FastFieldCodecType, - precision: DatePrecision, - ) -> crate::Result { - let time1 = DateTime::from_timestamp_micros( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - let time2 = DateTime::from_timestamp_micros( - SystemTime::now() - .checked_sub(Duration::from_micros(4111)) - .unwrap() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - - let time3 = DateTime::from_timestamp_micros( - SystemTime::now() - .checked_sub(Duration::from_millis(2000)) - .unwrap() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - - let mut schema_builder = Schema::builder(); - let date_options = DateOptions::default() - .set_fast(Cardinality::SingleValue) - .set_precision(precision); - let field = schema_builder.add_date_field("field", date_options); - let schema = schema_builder.build(); - - let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)]; - - let directory = get_index(&docs, &schema, codec_type.into())?; - let path = Path::new("test"); - let file = directory.open_read(path).unwrap(); - let composite_file = CompositeFile::open(&file)?; - let file = composite_file.open_read(*FIELD).unwrap(); - let len = file.len(); - let test_fastfield = open_fast_field::(file.read_bytes()?)?; - - assert_eq!(test_fastfield.get_val(0), time1.truncate(precision)); - assert_eq!(test_fastfield.get_val(1), time2.truncate(precision)); - assert_eq!(test_fastfield.get_val(2), time3.truncate(precision)); - Ok(len) - } - - #[test] - fn test_compute_gcd() { - let test_compute_gcd_aux = |large, small, expected| { - let large = NonZeroU64::new(large).unwrap(); - let small = NonZeroU64::new(small).unwrap(); - let expected = NonZeroU64::new(expected).unwrap(); - assert_eq!(compute_gcd(small, large), expected); - assert_eq!(compute_gcd(large, small), expected); - }; - test_compute_gcd_aux(1, 4, 1); - test_compute_gcd_aux(2, 4, 2); - test_compute_gcd_aux(10, 25, 5); - test_compute_gcd_aux(25, 25, 25); - } - - #[test] - fn find_gcd_test() { - assert_eq!(find_gcd([0].into_iter()), None); - assert_eq!(find_gcd([0, 10].into_iter()), NonZeroU64::new(10)); - assert_eq!(find_gcd([10, 0].into_iter()), NonZeroU64::new(10)); - assert_eq!(find_gcd([].into_iter()), None); - assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), NonZeroU64::new(5)); - assert_eq!(find_gcd([15, 16, 10].into_iter()), NonZeroU64::new(1)); - assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), NonZeroU64::new(5)); - assert_eq!(find_gcd([0, 0].into_iter()), None); - } -} diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index df829f8c5..c68d7c338 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -20,13 +20,12 @@ //! //! Read access performance is comparable to that of an array lookup. -use fastfield_codecs::FastFieldCodecType; +use fastfield_codecs::MonotonicallyMappableToU64; pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet}; pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; -pub(crate) use self::gcd::{find_gcd, GCD_DEFAULT}; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; @@ -39,19 +38,11 @@ mod alive_bitset; mod bytes; mod error; mod facet_reader; -mod gcd; mod multivalued; -mod reader; mod readers; mod serializer; mod writer; -pub(crate) const ALL_CODECS: &[FastFieldCodecType; 3] = &[ - FastFieldCodecType::Bitpacked, - FastFieldCodecType::Linear, - FastFieldCodecType::BlockwiseLinear, -]; - /// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data /// for a doc_id pub trait MultiValueLength { @@ -63,47 +54,26 @@ pub trait MultiValueLength { /// Trait for types that are allowed for fast fields: /// (u64, i64 and f64, bool, DateTime). -pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static { - /// Converts a value from u64 - /// - /// Internally all fast field values are encoded as u64. - /// **Note: To be used for converting encoded Term, Posting values.** - fn from_u64(val: u64) -> Self; - - /// Converts a value to u64. - /// - /// Internally all fast field values are encoded as u64. - fn to_u64(&self) -> u64; - +pub trait FastValue: + MonotonicallyMappableToU64 + Copy + Send + Sync + PartialOrd + 'static +{ /// Returns the fast field cardinality that can be extracted from the given /// `FieldType`. /// /// If the type is not a fast field, `None` is returned. fn fast_field_cardinality(field_type: &FieldType) -> Option; - /// Cast value to `u64`. - /// The value is just reinterpreted in memory. - fn as_u64(&self) -> u64; + /// Returns the `schema::Type` for this FastValue. + fn to_type() -> Type; /// Build a default value. This default value is never used, so the value does not /// really matter. fn make_zero() -> Self { - Self::from_u64(0i64.to_u64()) + Self::from_u64(0u64) } - - /// Returns the `schema::Type` for this FastValue. - fn to_type() -> Type; } impl FastValue for u64 { - fn from_u64(val: u64) -> Self { - val - } - - fn to_u64(&self) -> u64 { - *self - } - fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(), @@ -112,24 +82,12 @@ impl FastValue for u64 { } } - fn as_u64(&self) -> u64 { - *self - } - fn to_type() -> Type { Type::U64 } } impl FastValue for i64 { - fn from_u64(val: u64) -> Self { - common::u64_to_i64(val) - } - - fn to_u64(&self) -> u64 { - common::i64_to_u64(*self) - } - fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(), @@ -137,24 +95,12 @@ impl FastValue for i64 { } } - fn as_u64(&self) -> u64 { - *self as u64 - } - fn to_type() -> Type { Type::I64 } } impl FastValue for f64 { - fn from_u64(val: u64) -> Self { - common::u64_to_f64(val) - } - - fn to_u64(&self) -> u64 { - common::f64_to_u64(*self) - } - fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(), @@ -162,27 +108,12 @@ impl FastValue for f64 { } } - fn as_u64(&self) -> u64 { - self.to_bits() - } - fn to_type() -> Type { Type::F64 } } impl FastValue for bool { - fn from_u64(val: u64) -> Self { - val != 0u64 - } - - fn to_u64(&self) -> u64 { - match self { - false => 0, - true => 1, - } - } - fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(), @@ -190,28 +121,26 @@ impl FastValue for bool { } } - fn as_u64(&self) -> u64 { - *self as u64 - } - fn to_type() -> Type { Type::Bool } } +impl MonotonicallyMappableToU64 for DateTime { + fn to_u64(self) -> u64 { + self.timestamp_micros.to_u64() + } + + fn from_u64(val: u64) -> Self { + let timestamp_micros = i64::from_u64(val); + DateTime { timestamp_micros } + } +} + impl FastValue for DateTime { /// Converts a timestamp microseconds into DateTime. /// /// **Note the timestamps is expected to be in microseconds.** - fn from_u64(timestamp_micros_u64: u64) -> Self { - let timestamp_micros = i64::from_u64(timestamp_micros_u64); - Self::from_timestamp_micros(timestamp_micros) - } - - fn to_u64(&self) -> u64 { - common::i64_to_u64(self.into_timestamp_micros()) - } - fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { FieldType::Date(ref options) => options.get_fastfield_cardinality(), @@ -219,13 +148,15 @@ impl FastValue for DateTime { } } - fn as_u64(&self) -> u64 { - self.into_timestamp_micros().as_u64() - } - fn to_type() -> Type { Type::Date } + + fn make_zero() -> Self { + DateTime { + timestamp_micros: 0, + } + } } fn value_to_u64(value: &Value) -> u64 { @@ -266,8 +197,10 @@ mod tests { use std::ops::Range; use std::path::Path; use std::sync::Arc; + use std::time::{Duration, SystemTime}; use common::HasLen; + use fastfield_codecs::{open, FastFieldCodecType}; use once_cell::sync::Lazy; use rand::prelude::SliceRandom; use rand::rngs::StdRng; @@ -275,7 +208,6 @@ mod tests { use super::*; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; - use crate::fastfield::reader::open_fast_field; use crate::merge_policy::NoMergePolicy; use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT}; use crate::time::OffsetDateTime; @@ -286,61 +218,11 @@ mod tests { schema_builder.add_u64_field("field", FAST); schema_builder.build() }); - - pub static SCHEMAI64: Lazy = Lazy::new(|| { - let mut schema_builder = Schema::builder(); - schema_builder.add_i64_field("field", FAST); - schema_builder.build() - }); - pub static FIELD: Lazy = Lazy::new(|| SCHEMA.get_field("field").unwrap()); - pub static FIELDI64: Lazy = Lazy::new(|| SCHEMAI64.get_field("field").unwrap()); - - /// Encode values using the most appropriate codec and and then loads it - /// right away. - /// - /// This is useful in tests and bench. - pub(crate) fn encode_decode_fast_field( - vals: &[Item], - ) -> Arc> { - let mut schema_builder = Schema::builder(); - let field = schema_builder.add_u64_field("field", FAST); - let schema = schema_builder.build(); - let path = Path::new("__dummy__"); - let directory: RamDirectory = RamDirectory::create(); - { - let write: WritePtr = directory - .open_write(path) - .expect("With a RamDirectory, this should never fail."); - let mut serializer = CompositeFastFieldSerializer::from_write(write) - .expect("With a RamDirectory, this should never fail."); - let mut fast_field_writers = FastFieldsWriter::from_schema(&schema); - { - let fast_field_writer = fast_field_writers - .get_field_writer_mut(field) - .expect("With a RamDirectory, this should never fail."); - for val in vals { - fast_field_writer.add_val(val.to_u64()); - } - } - fast_field_writers - .serialize(&mut serializer, &HashMap::new(), None) - .unwrap(); - serializer.close().unwrap(); - } - let file = directory.open_read(path).expect("Failed to open the file"); - let composite_file = CompositeFile::open(&file).expect("Failed to read the composite file"); - let field_bytes = composite_file - .open_read(field) - .expect("File component not found") - .read_bytes() - .unwrap(); - open_fast_field(field_bytes).unwrap() - } #[test] pub fn test_fastfield() { - let test_fastfield = encode_decode_fast_field(&[100u64, 200u64, 300u64]); + let test_fastfield = fastfield_codecs::serialize_and_load(&[100u64, 200u64, 300u64][..]); assert_eq!(test_fastfield.get_val(0u64), 100); assert_eq!(test_fastfield.get_val(1u64), 200); assert_eq!(test_fastfield.get_val(2u64), 300); @@ -372,7 +254,7 @@ mod tests { assert_eq!(file.len(), 45); let composite_file = CompositeFile::open(&file)?; let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?; - let fast_field_reader = open_fast_field::(fast_field_bytes)?; + let fast_field_reader = open::(fast_field_bytes)?; assert_eq!(fast_field_reader.get_val(0), 13u64); assert_eq!(fast_field_reader.get_val(1), 14u64); assert_eq!(fast_field_reader.get_val(2), 2u64); @@ -407,7 +289,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), 4u64); assert_eq!(fast_field_reader.get_val(1), 14_082_001u64); assert_eq!(fast_field_reader.get_val(2), 3_052u64); @@ -446,7 +328,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; for doc in 0..10_000 { assert_eq!(fast_field_reader.get_val(doc), 100_000u64); } @@ -481,7 +363,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), 0u64); for doc in 1..10_001 { assert_eq!( @@ -525,7 +407,7 @@ mod tests { .open_read(i64_field) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.min_value(), -100i64); assert_eq!(fast_field_reader.max_value(), 9_999i64); @@ -568,7 +450,7 @@ mod tests { .open_read(i64_field) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), 0i64); } Ok(()) @@ -609,7 +491,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; for a in 0..n { assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]); @@ -865,7 +747,6 @@ mod tests { #[test] fn test_datefastfield() -> crate::Result<()> { - use crate::fastfield::FastValue; let mut schema_builder = Schema::builder(); let date_field = schema_builder.add_date_field( "date", @@ -926,7 +807,8 @@ mod tests { #[test] pub fn test_fastfield_bool() { - let test_fastfield = encode_decode_fast_field::(&[true, false, true, false]); + let test_fastfield: Arc> = + fastfield_codecs::serialize_and_load::(&[true, false, true, false]); assert_eq!(test_fastfield.get_val(0), true); assert_eq!(test_fastfield.get_val(1), false); assert_eq!(test_fastfield.get_val(2), true); @@ -960,7 +842,7 @@ mod tests { assert_eq!(file.len(), 44); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), true); assert_eq!(fast_field_reader.get_val(1), false); assert_eq!(fast_field_reader.get_val(2), true); @@ -996,7 +878,7 @@ mod tests { assert_eq!(file.len(), 56); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; for i in 0..25 { assert_eq!(fast_field_reader.get_val(i * 2), true); assert_eq!(fast_field_reader.get_val(i * 2 + 1), false); @@ -1030,115 +912,98 @@ mod tests { assert_eq!(file.len(), 43); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open_fast_field::(data)?; + let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), false); Ok(()) } -} -#[cfg(all(test, feature = "unstable"))] -mod bench { - use std::sync::Arc; - - use fastfield_codecs::Column; - use test::{self, Bencher}; - - use crate::fastfield::tests::{ - encode_decode_fast_field, generate_permutation, generate_permutation_gcd, - }; - - #[bench] - fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - b.iter(|| { - let mut a = 0u64; - for _ in 0..n { - a = permutation[a as usize]; + fn get_index( + docs: &[crate::Document], + schema: &Schema, + codec_types: &[FastFieldCodecType], + ) -> crate::Result { + let directory: RamDirectory = RamDirectory::create(); + { + let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); + let mut serializer = + CompositeFastFieldSerializer::from_write_with_codec(write, codec_types).unwrap(); + let mut fast_field_writers = FastFieldsWriter::from_schema(schema); + for doc in docs { + fast_field_writers.add_document(doc); } - a - }); + fast_field_writers + .serialize(&mut serializer, &HashMap::new(), None) + .unwrap(); + serializer.close().unwrap(); + } + Ok(directory) } - #[bench] - fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = encode_decode_fast_field(&permutation); - b.iter(|| { - let mut a = 0u64; - for _ in 0..n { - a = column.get_val(a as u64); - } - a - }); + #[test] + pub fn test_gcd_date() -> crate::Result<()> { + let size_prec_sec = + test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?; + let size_prec_micro = + test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?; + assert!(size_prec_sec < size_prec_micro); + + let size_prec_sec = + test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?; + let size_prec_micro = + test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?; + assert!(size_prec_sec < size_prec_micro); + Ok(()) } - #[bench] - fn bench_intfastfield_stride7_vec(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - b.iter(|| { - let mut a = 0u64; - for i in (0..n / 7).map(|val| val * 7) { - a += permutation[i as usize]; - } - a - }); - } + fn test_gcd_date_with_codec( + codec_type: FastFieldCodecType, + precision: DatePrecision, + ) -> crate::Result { + let time1 = DateTime::from_timestamp_micros( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); + let time2 = DateTime::from_timestamp_micros( + SystemTime::now() + .checked_sub(Duration::from_micros(4111)) + .unwrap() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); - #[bench] - fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = encode_decode_fast_field(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in (0..n / 7).map(|val| val * 7) { - a += column.get_val(i as u64); - } - a - }); - } + let time3 = DateTime::from_timestamp_micros( + SystemTime::now() + .checked_sub(Duration::from_millis(2000)) + .unwrap() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); - #[bench] - fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = encode_decode_fast_field(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in 0u64..n as u64 { - a += column.get_val(i); - } - a - }); - } + let mut schema_builder = Schema::builder(); + let date_options = DateOptions::default() + .set_fast(Cardinality::SingleValue) + .set_precision(precision); + let field = schema_builder.add_date_field("field", date_options); + let schema = schema_builder.build(); - #[bench] - fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) { - let permutation = generate_permutation_gcd(); - let n = permutation.len(); - let column: Arc> = encode_decode_fast_field(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in 0..n as u64 { - a += column.get_val(i); - } - a - }); - } + let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)]; - #[bench] - fn bench_intfastfield_scan_all_vec(b: &mut Bencher) { - let permutation = generate_permutation(); - b.iter(|| { - let mut a = 0u64; - for i in 0..permutation.len() { - a += permutation[i as usize] as u64; - } - a - }); + let directory = get_index(&docs, &schema, &[codec_type])?; + let path = Path::new("test"); + let file = directory.open_read(path).unwrap(); + let composite_file = CompositeFile::open(&file)?; + let file = composite_file.open_read(*FIELD).unwrap(); + let len = file.len(); + let test_fastfield = open::(file.read_bytes()?)?; + + assert_eq!(test_fastfield.get_val(0), time1.truncate(precision)); + assert_eq!(test_fastfield.get_val(1), time2.truncate(precision)); + assert_eq!(test_fastfield.get_val(2), time3.truncate(precision)); + Ok(len) } } diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 1a203d6ca..d4b54a111 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -341,6 +341,7 @@ mod tests { } proptest! { + #![proptest_config(proptest::prelude::ProptestConfig::with_cases(5))] #[test] fn test_multivalued_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) { assert!(test_multivalued_no_panic(&ops[..]).is_ok()); diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 3f0255b76..a185ee90b 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,10 +1,11 @@ use std::io; +use fastfield_codecs::MonotonicallyMappableToU64; use fnv::FnvHashMap; use tantivy_bitpacker::minmax; use crate::fastfield::serializer::BitpackedSerializerLegacy; -use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType, FastValue}; +use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; use crate::schema::{Document, Field, Value}; diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs deleted file mode 100644 index 77a6a2c21..000000000 --- a/src/fastfield/reader.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::sync::Arc; - -use common::BinarySerializable; -use fastfield_codecs::bitpacked::BitpackedCodec; -use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec; -use fastfield_codecs::linear::LinearCodec; -use fastfield_codecs::{monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType}; - -use super::gcd::open_gcd_from_bytes; -use super::FastValue; -use crate::directory::OwnedBytes; -use crate::error::DataCorruption; - -fn open_codec_from_bytes( - bytes: OwnedBytes, -) -> crate::Result>> { - let reader = C::open_from_bytes(bytes)?; - Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) -} - -fn open_codec_with_gcd( - bytes: OwnedBytes, -) -> crate::Result>> { - let reader = open_gcd_from_bytes::(bytes)?; - Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) -} - -/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. -fn open_from_id( - mut bytes: OwnedBytes, - codec_type: FastFieldCodecType, -) -> crate::Result>> { - match codec_type { - FastFieldCodecType::Bitpacked => open_codec_from_bytes::(bytes), - FastFieldCodecType::Linear => open_codec_from_bytes::(bytes), - FastFieldCodecType::BlockwiseLinear => { - open_codec_from_bytes::(bytes) - } - FastFieldCodecType::Gcd => { - let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; - match codec_type { - FastFieldCodecType::Bitpacked => open_codec_with_gcd::(bytes), - FastFieldCodecType::Linear => open_codec_with_gcd::(bytes), - FastFieldCodecType::BlockwiseLinear => { - open_codec_with_gcd::(bytes) - } - FastFieldCodecType::Gcd => Err(DataCorruption::comment_only( - "Gcd codec wrapped into another gcd codec. This combination is not allowed.", - ) - .into()), - } - } - } -} - -/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. -pub fn open_fast_field( - mut bytes: OwnedBytes, -) -> crate::Result>> { - let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; - open_from_id(bytes, codec_type) -} diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index e01b7fe3e..5784bc451 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -1,9 +1,8 @@ use std::sync::Arc; -use fastfield_codecs::Column; +use fastfield_codecs::{open, Column}; use crate::directory::{CompositeFile, FileSlice}; -use crate::fastfield::reader::open_fast_field; use crate::fastfield::{ BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader, }; @@ -116,7 +115,8 @@ impl FastFieldReaders { ) -> crate::Result>> { let fast_field_slice = self.fast_field_data(field, index)?; let bytes = fast_field_slice.read_bytes()?; - open_fast_field(bytes) + let column = fastfield_codecs::open(bytes)?; + Ok(column) } pub(crate) fn typed_fast_field_reader( @@ -248,7 +248,7 @@ impl FastFieldReaders { } let fast_field_idx_file = self.fast_field_data(field, 0)?; let fast_field_idx_bytes = fast_field_idx_file.read_bytes()?; - let idx_reader = open_fast_field(fast_field_idx_bytes)?; + let idx_reader = open(fast_field_idx_bytes)?; let data = self.fast_field_data(field, 1)?; BytesFastFieldReader::open(idx_reader, data) } else { diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 63dc2dd5c..10b692e2f 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -1,17 +1,11 @@ use std::io::{self, Write}; -use std::num::NonZeroU64; use common::{BinarySerializable, CountingWriter}; -use fastdivide::DividerU64; pub use fastfield_codecs::bitpacked::{BitpackedCodec, BitpackedSerializerLegacy}; -use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec; -use fastfield_codecs::linear::LinearCodec; -use fastfield_codecs::{monotonic_map_column, FastFieldCodecType}; pub use fastfield_codecs::{Column, FastFieldCodec, FastFieldStats}; +use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES}; -use super::{find_gcd, ALL_CODECS, GCD_DEFAULT}; use crate::directory::{CompositeWrite, WritePtr}; -use crate::fastfield::gcd::write_gcd_header; use crate::schema::Field; /// `CompositeFastFieldSerializer` is in charge of serializing @@ -36,68 +30,34 @@ use crate::schema::Field; /// * `close()` pub struct CompositeFastFieldSerializer { composite_write: CompositeWrite, - codec_enable_checker: FastFieldCodecEnableCheck, -} - -#[derive(Debug, Clone)] -pub struct FastFieldCodecEnableCheck { - enabled_codecs: Vec, -} -impl FastFieldCodecEnableCheck { - fn allow_all() -> Self { - FastFieldCodecEnableCheck { - enabled_codecs: ALL_CODECS.to_vec(), - } - } - fn is_enabled(&self, code_type: FastFieldCodecType) -> bool { - self.enabled_codecs.contains(&code_type) - } -} - -impl From for FastFieldCodecEnableCheck { - fn from(code_type: FastFieldCodecType) -> Self { - FastFieldCodecEnableCheck { - enabled_codecs: vec![code_type], - } - } -} - -// use this, when this is merged and stabilized explicit_generic_args_with_impl_trait -// https://github.com/rust-lang/rust/pull/86176 -fn codec_estimation( - fastfield_accessor: &D, - estimations: &mut Vec<(f32, FastFieldCodecType)>, -) { - if let Some(ratio) = C::estimate(fastfield_accessor) { - estimations.push((ratio, C::CODEC_TYPE)); - } + codec_types: Vec, } impl CompositeFastFieldSerializer { /// Constructor pub fn from_write(write: WritePtr) -> io::Result { - Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all()) + Self::from_write_with_codec(write, &ALL_CODEC_TYPES) } /// Constructor pub fn from_write_with_codec( write: WritePtr, - codec_enable_checker: FastFieldCodecEnableCheck, + codec_types: &[FastFieldCodecType], ) -> io::Result { // just making room for the pointer to header. let composite_write = CompositeWrite::wrap(write); Ok(CompositeFastFieldSerializer { composite_write, - codec_enable_checker, + codec_types: codec_types.to_vec(), }) } /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field( + pub fn create_auto_detect_u64_fast_field( &mut self, field: Field, - fastfield_accessor: impl Column, + fastfield_accessor: impl Column, ) -> io::Result<()> { self.create_auto_detect_u64_fast_field_with_idx(field, fastfield_accessor, 0) } @@ -114,102 +74,14 @@ impl CompositeFastFieldSerializer { /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field_with_idx( + pub fn create_auto_detect_u64_fast_field_with_idx( &mut self, field: Field, - fastfield_accessor: impl Column, + fastfield_accessor: impl Column, idx: usize, ) -> io::Result<()> { - let min_value = fastfield_accessor.min_value(); let field_write = self.composite_write.for_field_with_idx(field, idx); - let gcd = find_gcd(fastfield_accessor.iter().map(|val| val - min_value)) - .map(NonZeroU64::get) - .unwrap_or(GCD_DEFAULT); - - if gcd == 1 { - return Self::create_auto_detect_u64_fast_field_with_idx_gcd( - self.codec_enable_checker.clone(), - field, - field_write, - fastfield_accessor, - ); - } - - Self::write_header(field_write, FastFieldCodecType::Gcd)?; - - let base_value = fastfield_accessor.min_value(); - - let gcd_divider = DividerU64::divide_by(gcd); - - let divided_fastfield_accessor = monotonic_map_column(fastfield_accessor, |val: u64| { - gcd_divider.divide(val - base_value) - }); - - let num_vals = divided_fastfield_accessor.num_vals(); - - Self::create_auto_detect_u64_fast_field_with_idx_gcd( - self.codec_enable_checker.clone(), - field, - field_write, - divided_fastfield_accessor, - )?; - write_gcd_header(field_write, base_value, gcd, num_vals)?; - Ok(()) - } - - /// Serialize data into a new u64 fast field. The best compression codec will be chosen - /// automatically. - pub fn create_auto_detect_u64_fast_field_with_idx_gcd( - codec_enable_checker: FastFieldCodecEnableCheck, - field: Field, - field_write: &mut CountingWriter, - fastfield_accessor: impl Column, - ) -> io::Result<()> { - let mut estimations = vec![]; - - if codec_enable_checker.is_enabled(FastFieldCodecType::Bitpacked) { - codec_estimation::(&fastfield_accessor, &mut estimations); - } - if codec_enable_checker.is_enabled(FastFieldCodecType::Linear) { - codec_estimation::(&fastfield_accessor, &mut estimations); - } - if codec_enable_checker.is_enabled(FastFieldCodecType::BlockwiseLinear) { - codec_estimation::(&fastfield_accessor, &mut estimations); - } - if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) - { - warn!( - "broken estimation for fast field codec {:?}", - broken_estimation.1 - ); - } - // removing nan values for codecs with broken calculations, and max values which disables - // codecs - estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX); - estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); - let (_ratio, codec_type) = estimations[0]; - debug!("choosing fast field codec {codec_type:?} for field_id {field:?}"); // todo print actual field name - - Self::write_header(field_write, codec_type)?; - match codec_type { - FastFieldCodecType::Bitpacked => { - BitpackedCodec::serialize(field_write, &fastfield_accessor)?; - } - FastFieldCodecType::Linear => { - LinearCodec::serialize(field_write, &fastfield_accessor)?; - } - FastFieldCodecType::BlockwiseLinear => { - BlockwiseLinearCodec::serialize(field_write, &fastfield_accessor)?; - } - FastFieldCodecType::Gcd => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "GCD codec not supported.", - )); - } - } - field_write.flush()?; - + fastfield_codecs::serialize(fastfield_accessor, field_write, &self.codec_types)?; Ok(()) } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index c5b4c1b8b..1c0adadb1 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -2,13 +2,13 @@ use std::collections::HashMap; use std::io; use common; -use fastfield_codecs::Column; +use fastfield_codecs::{Column, MonotonicallyMappableToU64}; use fnv::FnvHashMap; use tantivy_bitpacker::BlockedBitpacker; use super::multivalued::MultiValuedFastFieldWriter; use super::serializer::FastFieldStats; -use super::{FastFieldType, FastValue}; +use super::FastFieldType; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; diff --git a/src/indexer/json_term_writer.rs b/src/indexer/json_term_writer.rs index 62db680fe..bee16df52 100644 --- a/src/indexer/json_term_writer.rs +++ b/src/indexer/json_term_writer.rs @@ -1,3 +1,4 @@ +use fastfield_codecs::MonotonicallyMappableToU64; use fnv::FnvHashMap; use murmurhash32::murmurhash2; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 7381b03c9..d48f326ac 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,7 +1,9 @@ +use fastfield_codecs::MonotonicallyMappableToU64; + use super::doc_id_mapping::{get_doc_id_mapping_from_field, DocIdMapping}; use super::operation::AddOperation; use crate::core::Segment; -use crate::fastfield::{FastFieldsWriter, FastValue as _}; +use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::json_term_writer::index_json_values; use crate::indexer::segment_serializer::SegmentSerializer; diff --git a/src/schema/term.rs b/src/schema/term.rs index 4b63c19c4..77aa7b29c 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -253,7 +253,7 @@ where B: AsRef<[u8]> } value_bytes.copy_from_slice(self.value_bytes()); let value_u64 = u64::from_be_bytes(value_bytes); - Some(FastValue::from_u64(value_u64)) + Some(T::from_u64(value_u64)) } /// Returns the `i64` value stored in a term. @@ -362,7 +362,7 @@ fn as_str(value_bytes: &[u8]) -> Option<&str> { fn get_fast_type(bytes: &[u8]) -> Option { let value_u64 = u64::from_be_bytes(bytes.try_into().ok()?); - Some(FastValue::from_u64(value_u64)) + Some(T::from_u64(value_u64)) } /// Returns the json path (without non-human friendly separators, the type of the value, and the