add merge code for u128

This commit is contained in:
Pascal Seitz
2022-08-11 16:41:54 +02:00
parent 399b137617
commit c56f4572f4
10 changed files with 407 additions and 212 deletions

View File

@@ -103,28 +103,6 @@ impl BitUnpacker {
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
val_shifted & mask
}
#[inline]
pub fn get_u32(&self, idx: u64, data: &[u8]) -> u32 {
if self.num_bits == 0 {
return 0u32;
}
let num_bits = self.num_bits;
let mask = self.mask as u32;
let addr_in_bits = idx * num_bits;
let addr = addr_in_bits >> 3;
let bit_shift = addr_in_bits & 7;
debug_assert!(
addr + 8 <= data.len() as u64,
"The fast field field should have been padded with 7 bytes."
);
let bytes: [u8; 4] = (&data[(addr as usize)..(addr as usize) + 4])
.try_into()
.unwrap();
let val_unshifted_unmasked: u32 = u32::from_le_bytes(bytes);
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u32;
val_shifted & mask
}
}
#[cfg(test)]

View File

@@ -399,6 +399,10 @@ pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor {
}
impl IntervalCompressor {
/// Taking the vals as Vec may cost a lot of memory.
/// It is used to sort the vals.
///
/// Less memory alternative: We could just store the index (u32), and use that as sorting.
pub fn from_vals(mut vals: Vec<u128>) -> Self {
vals.sort();
train(&vals)
@@ -492,6 +496,10 @@ impl FastFieldCodecReaderU128 for IntervallDecompressor {
fn null_value(&self) -> u128 {
self.compact_space.null_value
}
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a> {
Box::new(self.iter(data))
}
}
impl IntervallDecompressor {
@@ -586,11 +594,16 @@ impl IntervallDecompressor {
}
#[inline]
pub fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = u128> + 'a {
fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator<Item = Option<u128>> + 'a {
// TODO: Performance. It would be better to iterate on the ranges and check existence via
// the bit_unpacker.
self.iter_compact(data)
.map(|compact| self.compact_to_ip_addr(compact))
self.iter_compact(data).map(|compact| {
if compact == self.null_compact_space {
None
} else {
Some(self.compact_to_ip_addr(compact))
}
})
}
pub fn get(&self, idx: u64, data: &[u8]) -> Option<u128> {

View File

@@ -28,6 +28,11 @@ pub trait FastFieldCodecReaderU128: Sized {
/// Get value for doc
fn get(&self, doc: u64, data: &[u8]) -> Option<u128>;
/// Iterator
///
/// Replace with opaque type after: https://github.com/rust-lang/rust/issues/63063
fn iter<'a>(&'a self, data: &'a [u8]) -> Box<dyn Iterator<Item = Option<u128>> + 'a>;
/// Get docs for value range
fn get_range(&self, range: RangeInclusive<u128>, data: &[u8]) -> Vec<usize>;

235
src/fastfield/fast_value.rs Normal file
View File

@@ -0,0 +1,235 @@
use std::net::{IpAddr, Ipv6Addr};
use crate::schema::{Cardinality, FieldType, Type};
use crate::DateTime;
pub fn ip_to_u128(ip_addr: IpAddr) -> u128 {
let ip_addr_v6: Ipv6Addr = match ip_addr {
IpAddr::V4(v4) => v4.to_ipv6_mapped(),
IpAddr::V6(v6) => v6,
};
u128::from_be_bytes(ip_addr_v6.octets())
}
/// Trait for large types that are allowed for fast fields: u128, IpAddr
pub trait FastValueU128: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u128
///
/// Internally all fast field values are encoded as u128.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u128(val: u128) -> Self;
/// Converts a value to u128.
///
/// Internally all fast field values are encoded as u128.
fn to_u128(&self) -> u128;
/// Cast value to `u128`.
/// The value is just reinterpreted in memory.
fn as_u128(&self) -> u128;
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValueU128 for u128 {
fn from_u128(val: u128) -> Self {
val
}
fn to_u128(&self) -> u128 {
*self
}
fn as_u128(&self) -> u128 {
*self
}
fn to_type() -> Type {
Type::U128
}
}
impl FastValueU128 for IpAddr {
fn from_u128(val: u128) -> Self {
IpAddr::from(val.to_le_bytes())
}
fn to_u128(&self) -> u128 {
ip_to_u128(*self)
}
fn as_u128(&self) -> u128 {
ip_to_u128(*self)
}
fn to_type() -> Type {
Type::Ip
}
}
/// Trait for types that are allowed for fast fields:
/// (u64, i64 and f64, bool, DateTime).
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(&self) -> u64;
/// Returns the fast field cardinality that can be extracted from the given
/// `FieldType`.
///
/// If the type is not a fast field, `None` is returned.
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
/// Cast value to `u64`.
/// The value is just reinterpreted in memory.
fn as_u64(&self) -> u64;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u64(0i64.to_u64())
}
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValue for u64 {
fn from_u64(val: u64) -> Self {
val
}
fn to_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
FieldType::Facet(_) => Some(Cardinality::MultiValues),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self
}
fn to_type() -> Type {
Type::U64
}
}
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::I64
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
fn to_type() -> Type {
Type::F64
}
}
impl FastValue for bool {
fn from_u64(val: u64) -> Self {
val != 0u64
}
fn to_u64(&self) -> u64 {
match self {
false => 0,
true => 1,
}
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::Bool
}
}
impl FastValue for DateTime {
/// Converts a timestamp microseconds into DateTime.
///
/// **Note the timestamps is expected to be in microseconds.**
fn from_u64(timestamp_micros_u64: u64) -> Self {
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
Self::from_timestamp_micros(timestamp_micros)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.into_timestamp_micros().as_u64()
}
fn to_type() -> Type {
Type::Date
}
}

View File

@@ -24,20 +24,22 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::fast_value::{FastValue, FastValueU128};
pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader, FastFieldReaderCodecWrapperU128};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::schema::{Cardinality, FieldType, Type, Value};
use crate::{DateTime, DocId};
use crate::schema::Value;
use crate::DocId;
mod alive_bitset;
mod bytes;
mod error;
mod facet_reader;
mod fast_value;
mod gcd;
mod multivalued;
mod reader;
@@ -57,182 +59,6 @@ pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[
FastFieldCodecName::BlockwiseLinearInterpol,
];
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
/// returns the num of values associated to a doc_id
fn get_len(&self, doc_id: DocId) -> u64;
/// returns the sum of num values for all doc_ids
fn get_total_len(&self) -> u64;
}
/// Trait for types that are allowed for fast fields:
/// (u64, i64 and f64, bool, DateTime).
pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static {
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(&self) -> u64;
/// Returns the fast field cardinality that can be extracted from the given
/// `FieldType`.
///
/// If the type is not a fast field, `None` is returned.
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality>;
/// Cast value to `u64`.
/// The value is just reinterpreted in memory.
fn as_u64(&self) -> u64;
/// Build a default value. This default value is never used, so the value does not
/// really matter.
fn make_zero() -> Self {
Self::from_u64(0i64.to_u64())
}
/// Returns the `schema::Type` for this FastValue.
fn to_type() -> Type;
}
impl FastValue for u64 {
fn from_u64(val: u64) -> Self {
val
}
fn to_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
FieldType::Facet(_) => Some(Cardinality::MultiValues),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self
}
fn to_type() -> Type {
Type::U64
}
}
impl FastValue for i64 {
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::I64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::I64
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
fn to_type() -> Type {
Type::F64
}
}
impl FastValue for bool {
fn from_u64(val: u64) -> Self {
val != 0u64
}
fn to_u64(&self) -> u64 {
match self {
false => 0,
true => 1,
}
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Bool(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self as u64
}
fn to_type() -> Type {
Type::Bool
}
}
impl FastValue for DateTime {
/// Converts a timestamp microseconds into DateTime.
///
/// **Note the timestamps is expected to be in microseconds.**
fn from_u64(timestamp_micros_u64: u64) -> Self {
let timestamp_micros = i64::from_u64(timestamp_micros_u64);
Self::from_timestamp_micros(timestamp_micros)
}
fn to_u64(&self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::Date(ref options) => options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.into_timestamp_micros().as_u64()
}
fn to_type() -> Type {
Type::Date
}
}
fn value_to_u64(value: &Value) -> u64 {
match value {
Value::U64(val) => val.to_u64(),
@@ -244,6 +70,15 @@ fn value_to_u64(value: &Value) -> u64 {
}
}
/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data
/// for a doc_id
pub trait MultiValueLength {
/// returns the num of values associated to a doc_id
fn get_len(&self, doc_id: DocId) -> u64;
/// returns the sum of num values for all doc_ids
fn get_total_len(&self) -> u64;
}
/// The fast field type
pub enum FastFieldType {
/// Numeric type, e.g. f64.
@@ -280,9 +115,9 @@ mod tests {
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::merge_policy::NoMergePolicy;
use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT};
use crate::schema::{Cardinality, Document, Field, Schema, FAST, STRING, TEXT};
use crate::time::OffsetDateTime;
use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader};
use crate::{DateOptions, DatePrecision, DateTime, Index, SegmentId, SegmentReader};
pub static SCHEMA: Lazy<Schema> = Lazy::new(|| {
let mut schema_builder = Schema::builder();

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::path::Path;
use fastfield_codecs::bitpacked::{
@@ -11,9 +12,9 @@ use fastfield_codecs::linearinterpol::{
use fastfield_codecs::multilinearinterpol::{
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer};
use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecReaderU128, FastFieldCodecSerializer};
use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID};
use super::{FastValue, FastValueU128, GCDFastFieldCodec, GCD_CODEC_ID};
use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr};
use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter};
use crate::schema::{Schema, FAST};
@@ -210,6 +211,47 @@ impl<Item: FastValue> FastFieldReader<Item> for DynamicFastFieldReader<Item> {
}
}
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.
#[derive(Clone)]
pub struct FastFieldReaderCodecWrapperU128<Item: FastValueU128, CodecReader> {
reader: CodecReader,
bytes: OwnedBytes,
_phantom: PhantomData<Item>,
}
impl<Item: FastValueU128, C: FastFieldCodecReaderU128> FastFieldReaderCodecWrapperU128<Item, C> {
/// Opens a fast field given the bytes.
pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result<Self> {
let reader = C::open_from_bytes(bytes.as_slice())?;
Ok(Self {
reader,
bytes,
_phantom: PhantomData,
})
}
/// Returns the item for the docid
pub fn get(&self, doc: u64) -> Option<Item> {
self.reader
.get(doc, self.bytes.as_slice())
.map(|el| Item::from_u128(el))
}
/// Iterates over all elements in the fast field
pub fn iter(&self) -> impl Iterator<Item = Option<Item>> + '_ {
self.reader
.iter(self.bytes.as_slice())
.map(|el| el.map(Item::from_u128))
}
/// Returns all docids which are in the provided range
pub fn get_range(&self, range: RangeInclusive<u128>) -> Vec<usize> {
self.reader.get_range(range, self.bytes.as_slice())
}
}
/// Wrapper for accessing a fastfield.
///
/// Holds the data and the codec to the read the data.

View File

@@ -1,4 +1,8 @@
use super::reader::DynamicFastFieldReader;
use std::net::IpAddr;
use fastfield_codecs::ip_codec::IntervallDecompressor;
use super::reader::{DynamicFastFieldReader, FastFieldReaderCodecWrapperU128};
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
@@ -137,6 +141,30 @@ impl FastFieldReaders {
self.typed_fast_field_reader(field)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addr(
&self,
field: Field,
) -> crate::Result<FastFieldReaderCodecWrapperU128<IpAddr, IntervallDecompressor>> {
let fast_field_slice = self.fast_field_data(field, 0)?;
let bytes = fast_field_slice.read_bytes()?;
FastFieldReaderCodecWrapperU128::<IpAddr, IntervallDecompressor>::open_from_bytes(bytes)
}
/// Returns the `u128` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn u128(
&self,
field: Field,
) -> crate::Result<FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor>> {
let fast_field_slice = self.fast_field_data(field, 0)?;
let bytes = fast_field_slice.read_bytes()?;
FastFieldReaderCodecWrapperU128::<u128, IntervallDecompressor>::open_from_bytes(bytes)
}
/// Returns the `u64` fast field reader reader associated to `field`, regardless of whether the
/// given field is effectively of type `u64` or not.
///

View File

@@ -2,6 +2,7 @@ use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use fastfield_codecs::ip_codec::{IntervalCompressor, IntervallDecompressor};
use itertools::Itertools;
use measure_time::debug_time;
use tantivy_bitpacker::minmax;
@@ -11,7 +12,8 @@ use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
AliveBitSet, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldDataAccess,
FastFieldReader, FastFieldStats, MultiValueLength, MultiValuedFastFieldReader,
FastFieldReader, FastFieldReaderCodecWrapperU128, FastFieldStats, MultiValueLength,
MultiValuedFastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -323,7 +325,11 @@ impl IndexMerger {
}
FieldType::Ip(options) => {
if options.is_fast() {
// TODO create fast field for merge
self.write_u128_single_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
}
@@ -337,6 +343,50 @@ impl IndexMerger {
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_single_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u128_reader: FastFieldReaderCodecWrapperU128<u128, IntervallDecompressor> =
reader.fast_fields().u128(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
u128_reader
})
.collect::<Vec<_>>();
let compressor = {
let vals = fast_field_readers
.iter()
.flat_map(|reader| reader.iter())
.flatten()
.collect::<Vec<u128>>();
IntervalCompressor::from_vals(vals)
};
let iter = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| {
let fast_field_reader = &fast_field_readers[*reader_ordinal as usize];
fast_field_reader
.get((*doc_id) as u64)
.unwrap_or(compressor.null_value)
});
let field_write = fast_field_serializer.get_field_writer(field, 0);
compressor.compress_into(iter, field_write)?;
Ok(())
}
// used both to merge field norms, `u64/i64` single fast fields.
fn write_single_fast_field(
&self,

View File

@@ -67,9 +67,11 @@ pub enum Type {
Json = b'j',
/// IpAddr
Ip = b'p',
/// IpAddr
U128 = b'1',
}
const ALL_TYPES: [Type; 10] = [
const ALL_TYPES: [Type; 11] = [
Type::Str,
Type::U64,
Type::I64,
@@ -80,6 +82,7 @@ const ALL_TYPES: [Type; 10] = [
Type::Bytes,
Type::Json,
Type::Ip,
Type::U128,
];
impl Type {
@@ -107,6 +110,7 @@ impl Type {
Type::Bytes => "Bytes",
Type::Json => "Json",
Type::Ip => "Ip",
Type::U128 => "U128",
}
}
@@ -124,6 +128,7 @@ impl Type {
b'b' => Some(Type::Bytes),
b'j' => Some(Type::Json),
b'p' => Some(Type::Ip),
b'1' => Some(Type::U128),
_ => None,
}
}

View File

@@ -419,6 +419,10 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
Type::U128 => {
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
}
Ok(())
}