add ip field

add u128 multivalue reader and writer
add ip to schema
add ip writers, handle merge
This commit is contained in:
Pascal Seitz
2022-09-20 16:17:34 +08:00
parent 5f565e77de
commit 400a20b7af
23 changed files with 966 additions and 33 deletions

View File

@@ -60,6 +60,7 @@ measure_time = "0.8.2"
ciborium = { version = "0.2", optional = true}
async-trait = "0.1.53"
arc-swap = "1.5.0"
roaring = "0.10.1"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"

View File

@@ -102,7 +102,7 @@ mod tests {
let mut out = vec![];
serialize_u128(VecColumn::from(&data), &mut out).unwrap();
let out = OwnedBytes::new(out);
open_u128(out).unwrap()
open_u128::<u128>(out).unwrap()
}
#[bench]

View File

@@ -604,7 +604,7 @@ mod tests {
];
let mut out = Vec::new();
serialize_u128(VecColumn::from(vals), &mut out).unwrap();
let decomp = open_u128(OwnedBytes::new(out)).unwrap();
let decomp = open_u128::<u128>(OwnedBytes::new(out)).unwrap();
assert_eq!(decomp.get_between_vals(199..=200), vec![0]);
assert_eq!(decomp.get_between_vals(199..=201), vec![0, 1]);

View File

@@ -22,6 +22,7 @@ mod compact_space;
mod line;
mod linear;
mod monotonic_mapping;
mod monotonic_mapping_u128;
mod column;
mod gcd;
@@ -32,6 +33,7 @@ use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{monotonic_map_column, Column, VecColumn};
use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
pub use self::serialize::{
estimate, serialize, serialize_and_load, serialize_u128, NormalizedHeader,
};
@@ -73,8 +75,12 @@ impl FastFieldCodecType {
}
/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128(bytes: OwnedBytes) -> io::Result<Arc<dyn Column<u128>>> {
Ok(Arc::new(CompactSpaceDecompressor::open(bytes)?))
pub fn open_u128<Item: MonotonicallyMappableToU128>(
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let monotonic_mapping = move |val: u128| Item::from_u128(val);
let reader = CompactSpaceDecompressor::open(bytes)?;
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
}
/// Returns the correct codec reader wrapped in the `Arc` for the data.

View File

@@ -110,7 +110,7 @@ fn bench_ip() {
(data.len() * 8) as f32 / dataset.len() as f32
);
let decompressor = open_u128(OwnedBytes::new(data)).unwrap();
let decompressor = open_u128::<u128>(OwnedBytes::new(data)).unwrap();
// Sample some ranges
for value in dataset.iter().take(1110).skip(1100).cloned() {
print_time!("get range");

View File

@@ -27,7 +27,10 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex};
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::multivalued::{
MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, MultiValuedFastFieldWriter,
MultiValuedU128FastFieldReader,
};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{Column, CompositeFastFieldSerializer};

View File

@@ -3,9 +3,9 @@ mod writer;
use fastfield_codecs::FastFieldCodecType;
pub use self::reader::MultiValuedFastFieldReader;
pub use self::writer::MultiValuedFastFieldWriter;
pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader};
pub(crate) use self::writer::MultivalueStartIndex;
pub use self::writer::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter};
/// The valid codecs for multivalue values excludes the linear interpolation codec.
///

View File

@@ -1,7 +1,7 @@
use std::ops::Range;
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use fastfield_codecs::Column;
use fastfield_codecs::{Column, MonotonicallyMappableToU128};
use crate::fastfield::{FastValue, MultiValueLength};
use crate::DocId;
@@ -99,6 +99,153 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
self.total_num_vals() as u64
}
}
/// Reader for a multivalued `u128` fast field.
///
/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field.
///
/// The `vals_reader` will access the concatenated list of all
/// values for all reader.
/// The `idx_reader` associated, for each document, the index of its first value.
#[derive(Clone)]
pub struct MultiValuedU128FastFieldReader<T: MonotonicallyMappableToU128> {
idx_reader: Arc<dyn Column<u64>>,
vals_reader: Arc<dyn Column<T>>,
}
impl<T: MonotonicallyMappableToU128> MultiValuedU128FastFieldReader<T> {
pub(crate) fn open(
idx_reader: Arc<dyn Column<u64>>,
vals_reader: Arc<dyn Column<T>>,
) -> MultiValuedU128FastFieldReader<T> {
Self {
idx_reader,
vals_reader,
}
}
/// Returns `[start, end)`, such that the values associated
/// to the given document are `start..end`.
#[inline]
fn range(&self, doc: DocId) -> Range<u64> {
let start = self.idx_reader.get_val(doc as u64);
let end = self.idx_reader.get_val(doc as u64 + 1);
start..end
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_first_val(&self, doc: DocId) -> Option<T> {
let range = self.range(doc);
if range.is_empty() {
return None;
}
Some(self.vals_reader.get_val(range.start))
}
/// Returns the array of values associated to the given `doc`.
#[inline]
fn get_vals_for_range(&self, range: Range<u64>, vals: &mut Vec<T>) {
let len = (range.end - range.start) as usize;
vals.resize(len, T::from_u128(0));
self.vals_reader.get_range(range.start, &mut vals[..]);
}
/// Returns the array of values associated to the given `doc`.
#[inline]
pub fn get_vals(&self, doc: DocId, vals: &mut Vec<T>) {
let range = self.range(doc);
self.get_vals_for_range(range, vals);
}
/// Returns all docids which are in the provided value range
pub fn get_between_vals(&self, range: RangeInclusive<T>) -> Vec<DocId> {
let positions = self.vals_reader.get_between_vals(range);
positions_to_docids(&positions, self)
}
/// Iterates over all elements in the fast field
pub fn iter(&self) -> impl Iterator<Item = T> + '_ {
self.vals_reader.iter()
}
/// Returns the minimum value for this fast field.
///
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual mimimum value.
pub fn min_value(&self) -> T {
self.vals_reader.min_value()
}
/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value.
pub fn max_value(&self) -> T {
self.vals_reader.max_value()
}
/// Returns the number of values associated with the document `DocId`.
#[inline]
pub fn num_vals(&self, doc: DocId) -> usize {
let range = self.range(doc);
(range.end - range.start) as usize
}
/// Returns the overall number of values in this field.
#[inline]
pub fn total_num_vals(&self) -> u64 {
self.idx_reader.max_value()
}
}
impl<T: MonotonicallyMappableToU128> MultiValueLength for MultiValuedU128FastFieldReader<T> {
fn get_range(&self, doc_id: DocId) -> std::ops::Range<u64> {
self.range(doc_id)
}
fn get_len(&self, doc_id: DocId) -> u64 {
self.num_vals(doc_id) as u64
}
fn get_total_len(&self) -> u64 {
self.total_num_vals() as u64
}
}
/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds.
///
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index.
///
/// Correctness: positions needs to be sorted.
///
/// TODO: Instead of a linear scan we can employ a binary search to match a docid to its value
/// position.
fn positions_to_docids<T: MultiValueLength>(positions: &[u64], multival_idx: &T) -> Vec<DocId> {
let mut docs = vec![];
let mut cur_doc = 0u32;
let mut last_doc = None;
for pos in positions {
loop {
let range = multival_idx.get_range(cur_doc);
if range.contains(pos) {
// avoid duplicates
if Some(cur_doc) == last_doc {
break;
}
docs.push(cur_doc);
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
}
}
docs
}
#[cfg(test)]
mod tests {

View File

@@ -1,6 +1,8 @@
use std::io;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fastfield_codecs::{
serialize_u128, Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use fnv::FnvHashMap;
use super::get_fastfield_codecs_for_multivalue;
@@ -264,6 +266,144 @@ fn iter_remapped_multivalue_index<'a, C: Column>(
}))
}
/// Writer for multi-valued (as in, more than one value per document)
/// int fast field.
///
/// This `Writer` is only useful for advanced users.
/// The normal way to get your multivalued int in your index
/// is to
/// - declare your field with fast set to `Cardinality::MultiValues`
/// in your schema
/// - add your document simply by calling `.add_document(...)`.
///
/// The `MultiValuedFastFieldWriter` can be acquired from the
pub struct MultiValueU128FastFieldWriter {
field: Field,
vals: Vec<u128>,
doc_index: Vec<u64>,
}
impl MultiValueU128FastFieldWriter {
/// Creates a new `U128MultiValueFastFieldWriter`
pub(crate) fn new(field: Field) -> Self {
MultiValueU128FastFieldWriter {
field,
vals: Vec::new(),
doc_index: Vec::new(),
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.capacity() * std::mem::size_of::<UnorderedTermId>()
+ self.doc_index.capacity() * std::mem::size_of::<u64>()
}
/// Finalize the current document.
pub(crate) fn next_doc(&mut self) {
self.doc_index.push(self.vals.len() as u64);
}
/// Pushes a new value to the current document.
pub(crate) fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Shift to the next document and adds
/// all of the matching field values present in the document.
pub fn add_document(&mut self, doc: &Document) {
self.next_doc();
for field_value in doc.field_values() {
if field_value.field == self.field {
let value = field_value.value();
let ip_addr = value.as_ip().unwrap();
let value = ip_addr.to_u128();
self.add_val(value);
}
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b>(
&'a self,
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [u128]> {
get_ordered_values(&self.vals, &self.doc_index, doc_id_map)
}
/// Serializes fast field values.
pub fn serialize(
mut self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
{
// writing the offset index
//
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
if let Some(doc_id_map) = doc_id_map {
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
serializer.create_auto_detect_u64_fast_field_with_idx(
self.field,
multi_value_start_index,
0,
)?;
} else {
serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?;
}
}
{
let field_write = serializer.get_field_writer(self.field, 1);
let mut values = Vec::with_capacity(self.vals.len());
for vals in self.get_ordered_values(doc_id_map) {
for &val in vals {
values.push(val);
}
}
let col = VecColumn::from(&values[..]);
serialize_u128(col, field_write)?;
}
Ok(())
}
}
/// Returns an iterator over values per doc_id in ascending doc_id order.
///
/// Normally the order is simply iterating self.doc_id_index.
/// With doc_id_map it accounts for the new mapping, returning values in the order of the
/// new doc_ids.
fn get_ordered_values<'a: 'b, 'b, T>(
vals: &'a [T],
doc_index: &'a [u64],
doc_id_map: Option<&'b DocIdMapping>,
) -> impl Iterator<Item = &'b [T]> {
let doc_id_iter: Box<dyn Iterator<Item = u32>> = if let Some(doc_id_map) = doc_id_map {
Box::new(doc_id_map.iter_old_doc_ids())
} else {
let max_doc = doc_index.len() as DocId;
Box::new(0..max_doc)
};
doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index))
}
/// returns all values for a doc_id
fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] {
let start_pos = doc_index[doc_id as usize] as usize;
let end_pos = doc_index
.get(doc_id as usize + 1)
.cloned()
.unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information
&vals[start_pos..end_pos]
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,7 +1,9 @@
use std::net::IpAddr;
use std::sync::Arc;
use fastfield_codecs::{open, Column};
use fastfield_codecs::{open, open_u128, Column};
use super::multivalued::MultiValuedU128FastFieldReader;
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
@@ -23,6 +25,7 @@ pub struct FastFieldReaders {
pub(crate) enum FastType {
I64,
U64,
U128,
F64,
Bool,
Date,
@@ -49,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType,
FieldType::Str(options) if options.is_fast() => {
Some((FastType::U64, Cardinality::MultiValues))
}
FieldType::Ip(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::U128, cardinality)),
_ => None,
}
}
@@ -143,6 +149,56 @@ impl FastFieldReaders {
self.typed_fast_field_reader(field)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addr(&self, field: Field) -> crate::Result<Arc<dyn Column<IpAddr>>> {
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
let bytes = self.fast_field_data(field, 0)?.read_bytes()?;
Ok(open_u128::<IpAddr>(bytes)?)
}
/// Returns the `ip` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn ip_addrs(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<IpAddr>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: Arc<dyn Column<u64>> = self.typed_fast_field_reader(field)?;
let bytes = self.fast_field_data(field, 1)?.read_bytes()?;
let vals_reader = open_u128::<IpAddr>(bytes)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u128` fast field reader reader associated to `field`.
///
/// If `field` is not a u128 fast field, this method returns an Error.
pub fn u128(&self, field: Field) -> crate::Result<Arc<dyn Column<u128>>> {
self.check_type(field, FastType::U128, Cardinality::SingleValue)?;
let bytes = self.fast_field_data(field, 0)?.read_bytes()?;
Ok(open_u128::<u128>(bytes)?)
}
/// Returns the `u128` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a u128 multi-valued fast field, this method returns an Error.
pub fn u128s(&self, field: Field) -> crate::Result<MultiValuedU128FastFieldReader<u128>> {
self.check_type(field, FastType::U128, Cardinality::MultiValues)?;
let idx_reader: Arc<dyn Column<u64>> = self.typed_fast_field_reader(field)?;
let bytes = self.fast_field_data(field, 1)?.read_bytes()?;
let vals_reader = open_u128::<u128>(bytes)?;
Ok(MultiValuedU128FastFieldReader::open(
idx_reader,
vals_reader,
))
}
/// Returns the `u64` fast field reader reader associated with `field`, regardless of whether
/// the given field is effectively of type `u64` or not.
///

View File

@@ -93,6 +93,11 @@ impl CompositeFastFieldSerializer {
self.composite_write.for_field_with_idx(field, 1)
}
/// Gets the underlying writer
pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write {
self.composite_write.for_field_with_idx(field, idx)
}
/// Closes the serializer
///
/// After this call the data must be persistently saved on disk.

View File

@@ -2,11 +2,14 @@ use std::collections::HashMap;
use std::io;
use common;
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fastfield_codecs::{
serialize_u128, Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
};
use fnv::FnvHashMap;
use roaring::RoaringBitmap;
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
use super::multivalued::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter};
use super::FastFieldType;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -19,6 +22,8 @@ use crate::DatePrecision;
pub struct FastFieldsWriter {
term_id_writers: Vec<MultiValuedFastFieldWriter>,
single_value_writers: Vec<IntFastFieldWriter>,
u128_value_writers: Vec<U128FastFieldWriter>,
u128_multi_value_writers: Vec<MultiValueU128FastFieldWriter>,
multi_values_writers: Vec<MultiValuedFastFieldWriter>,
bytes_value_writers: Vec<BytesFastFieldWriter>,
}
@@ -34,6 +39,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 {
impl FastFieldsWriter {
/// Create all `FastFieldWriter` required by the schema.
pub fn from_schema(schema: &Schema) -> FastFieldsWriter {
let mut u128_value_writers = Vec::new();
let mut u128_multi_value_writers = Vec::new();
let mut single_value_writers = Vec::new();
let mut term_id_writers = Vec::new();
let mut multi_values_writers = Vec::new();
@@ -97,10 +104,27 @@ impl FastFieldsWriter {
bytes_value_writers.push(fast_field_writer);
}
}
FieldType::Ip(opt) => {
if opt.is_fast() {
match opt.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
let fast_field_writer = U128FastFieldWriter::new(field);
u128_value_writers.push(fast_field_writer);
}
Some(Cardinality::MultiValues) => {
let fast_field_writer = MultiValueU128FastFieldWriter::new(field);
u128_multi_value_writers.push(fast_field_writer);
}
None => {}
}
}
}
FieldType::Str(_) | FieldType::JsonObject(_) => {}
}
}
FastFieldsWriter {
u128_value_writers,
u128_multi_value_writers,
term_id_writers,
single_value_writers,
multi_values_writers,
@@ -129,6 +153,16 @@ impl FastFieldsWriter {
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
+ self
.u128_multi_value_writers
.iter()
.map(|w| w.mem_usage())
.sum::<usize>()
}
/// Get the `FastFieldWriter` associated with a field.
@@ -190,7 +224,6 @@ impl FastFieldsWriter {
.iter_mut()
.find(|field_writer| field_writer.field() == field)
}
/// Indexes all of the fastfields of a new document.
pub fn add_document(&mut self, doc: &Document) {
for field_writer in &mut self.term_id_writers {
@@ -205,6 +238,12 @@ impl FastFieldsWriter {
for field_writer in &mut self.bytes_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_value_writers {
field_writer.add_document(doc);
}
for field_writer in &mut self.u128_multi_value_writers {
field_writer.add_document(doc);
}
}
/// Serializes all of the `FastFieldWriter`s by pushing them in
@@ -230,6 +269,161 @@ impl FastFieldsWriter {
for field_writer in self.bytes_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in self.u128_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
for field_writer in self.u128_multi_value_writers {
field_writer.serialize(serializer, doc_id_map)?;
}
Ok(())
}
}
/// Fast field writer for u128 values.
/// The fast field writer just keeps the values in memory.
///
/// Only when the segment writer can be closed and
/// persisted on disc, the fast field writer is
/// sent to a `FastFieldSerializer` via the `.serialize(...)`
/// method.
///
/// We cannot serialize earlier as the values are
/// compressed to a compact number space and the number of
/// bits required for bitpacking can only been known once
/// we have seen all of the values.
pub struct U128FastFieldWriter {
field: Field,
vals: Vec<u128>,
val_count: u32,
null_values: RoaringBitmap,
}
impl U128FastFieldWriter {
/// Creates a new `IntFastFieldWriter`
pub fn new(field: Field) -> Self {
Self {
field,
vals: vec![],
val_count: 0,
null_values: RoaringBitmap::new(),
}
}
/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.vals.len() * 16
}
/// Records a new value.
///
/// The n-th value being recorded is implicitely
/// associated to the document with the `DocId` n.
/// (Well, `n-1` actually because of 0-indexing)
pub fn add_val(&mut self, val: u128) {
self.vals.push(val);
}
/// Extract the fast field value from the document
/// (or use the default value) and records it.
///
/// Extract the value associated to the fast field for
/// this document.
pub fn add_document(&mut self, doc: &Document) {
match doc.get_first(self.field) {
Some(v) => {
let ip_addr = v.as_ip().unwrap();
let value = ip_addr.to_u128();
self.add_val(value);
}
None => {
self.null_values.insert(self.val_count as u32);
}
};
self.val_count += 1;
}
/// Push the fast fields value to the `FastFieldWriter`.
pub fn serialize(
&self,
serializer: &mut CompositeFastFieldSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
// To get the actual value, we could materialize the vec with u128 including nulls, but
// that could cost a lot of memory. Instead we just compute the index for of
// the values
let mut idx_to_val_idx = vec![];
idx_to_val_idx.resize(self.val_count as usize, 0);
let mut val_idx = 0;
for idx in 0..self.val_count {
if !self.null_values.contains(idx as u32) {
idx_to_val_idx[idx as usize] = val_idx as u32;
val_idx += 1;
}
}
struct RemappedFFWriter<'a> {
doc_id_map: Option<&'a DocIdMapping>,
null_values: &'a RoaringBitmap,
vals: &'a [u128],
idx_to_val_idx: Vec<u32>,
val_count: u32,
}
impl<'a> Column<u128> for RemappedFFWriter<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}
fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn num_vals(&self) -> u64 {
self.val_count as u64
}
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
if let Some(doc_id_map) = self.doc_id_map {
let iter = doc_id_map.iter_old_doc_ids().map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[self.idx_to_val_idx[idx as usize] as usize]
}
});
Box::new(iter)
} else {
let iter = (0..self.val_count).map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[self.idx_to_val_idx[idx as usize] as usize]
}
});
Box::new(iter)
}
}
}
let column = RemappedFFWriter {
doc_id_map,
null_values: &self.null_values,
vals: &self.vals,
idx_to_val_idx,
val_count: self.val_count,
};
let field_write = serializer.get_field_writer(self.field, 0);
serialize_u128(column, field_write)?;
Ok(())
}
}

View File

@@ -803,7 +803,9 @@ impl Drop for IndexWriter {
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use fastfield_codecs::MonotonicallyMappableToU128;
use proptest::prelude::*;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
@@ -815,7 +817,7 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpOptions, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
@@ -1593,6 +1595,11 @@ mod tests {
force_end_merge: bool,
) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder.add_ip_field(
"ips",
IpOptions::default().set_fast(Cardinality::MultiValues),
);
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
@@ -1648,17 +1655,37 @@ mod tests {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
let ip_from_id = IpAddr::from_u128(id as u128);
if id % 3 == 0 {
// every 3rd doc has no ip field
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
} else {
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
ip_field => ip_from_id,
ips_field => ip_from_id,
ips_field => ip_from_id,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
}
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
@@ -1744,6 +1771,59 @@ mod tests {
.collect::<HashSet<_>>()
);
// Load all ips addr
let ips: HashSet<IpAddr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let val = ff_reader.get_val(doc as u64);
if val == IpAddr::from_u128(0) {
None
} else {
Some(val)
}
})
})
.collect();
let expected_ips = expected_ids_and_num_occurrences
.keys()
.flat_map(|id| {
if id % 3 == 0 {
None
} else {
Some(IpAddr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
assert_eq!(ips, expected_ips);
let expected_ips = expected_ids_and_num_occurrences
.keys()
.filter_map(|id| {
if id % 3 == 0 {
None
} else {
Some(IpAddr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
let ips: HashSet<IpAddr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let mut vals = vec![];
ff_reader.get_vals(doc, &mut vals);
vals.into_iter().filter(|val| val.to_u128() != 0)
})
})
.collect();
assert_eq!(ips, expected_ips);
// multivalue fast field tests
for segment_reader in searcher.segment_readers().iter() {
let id_reader = segment_reader.fast_fields().u64(id_field).unwrap();
@@ -1847,6 +1927,36 @@ mod tests {
Ok(())
}
#[test]
fn test_minimal() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
true,
false
)
.is_ok());
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::DeleteDoc { id: 13 }
],
false,
false
)
.is_ok());
}
#[test]
fn test_minimal_sort_merge() {
assert!(test_operation_strategy(&[IndexingOp::AddDoc { id: 3 },], true, true).is_ok());
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
use fastfield_codecs::VecColumn;
use fastfield_codecs::{serialize_u128, VecColumn};
use itertools::Itertools;
use measure_time::debug_time;
@@ -11,8 +11,8 @@ use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{
get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer,
MultiValueLength, MultiValuedFastFieldReader,
get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, MultiValueLength,
MultiValuedFastFieldReader, MultiValuedU128FastFieldReader,
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
@@ -295,6 +295,24 @@ impl IndexMerger {
self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
}
}
FieldType::Ip(options) => match options.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
self.write_u128_single_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
Some(Cardinality::MultiValues) => {
self.write_u128_multi_fast_field(
field,
fast_field_serializer,
doc_id_mapping,
)?;
}
None => {}
},
FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
// We don't handle json fast field for the moment
// They can be implemented using what is done
@@ -305,6 +323,143 @@ impl IndexMerger {
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_multi_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let segment_and_ff_readers = self
.readers
.iter()
.map(|segment_reader| {
let ff_reader: MultiValuedU128FastFieldReader<u128> =
segment_reader.fast_fields().u128s(field).expect(
"Failed to find index for multivalued field. This is a bug in tantivy, \
please report.",
);
(segment_reader, ff_reader)
})
.collect::<Vec<_>>();
Self::write_1_n_fast_field_idx_generic(
field,
fast_field_serializer,
doc_id_mapping,
&segment_and_ff_readers,
)?;
let fast_field_readers = segment_and_ff_readers
.into_iter()
.map(|(_, ff_reader)| ff_reader)
.collect::<Vec<_>>();
struct RemappedFFReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: Vec<MultiValuedU128FastFieldReader<u128>>,
}
impl<'a> Column<u128> for RemappedFFReader<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}
fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn num_vals(&self) -> u64 {
self.doc_id_mapping.len() as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|doc_addr| {
let fast_field_reader =
&self.fast_field_readers[doc_addr.segment_ord as usize];
let mut out = vec![];
fast_field_reader.get_vals(doc_addr.doc_id, &mut out);
out.into_iter()
}),
)
}
}
let column = RemappedFFReader {
doc_id_mapping,
fast_field_readers,
};
let field_write = fast_field_serializer.get_field_writer(field, 1);
serialize_u128(column, field_write)?;
Ok(())
}
// used to merge `u128` single fast fields.
fn write_u128_single_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u128_reader: Arc<dyn Column<u128>> = reader.fast_fields().u128(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and it \
should never happen.",
);
u128_reader
})
.collect::<Vec<_>>();
struct RemappedFFReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: Vec<Arc<dyn Column<u128>>>,
}
impl<'a> Column<u128> for RemappedFFReader<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}
fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}
fn num_vals(&self) -> u64 {
self.doc_id_mapping.len() as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
Box::new(self.doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let fast_field_reader = &self.fast_field_readers[doc_addr.segment_ord as usize];
fast_field_reader.get_val(doc_addr.doc_id as u64)
}))
}
}
let column = RemappedFFReader {
doc_id_mapping,
fast_field_readers,
};
let field_write = fast_field_serializer.get_field_writer(field, 0);
serialize_u128(column, field_write)?;
Ok(())
}
// used both to merge field norms, `u64/i64` single fast fields.
fn write_single_fast_field(
&self,

View File

@@ -294,6 +294,13 @@ impl SegmentWriter {
ctx,
)?;
}
FieldType::Ip(_) => {
for value in values {
let ip_val = value.as_ip().ok_or_else(make_schema_error)?;
term_buffer.set_text(&ip_val.to_string());
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
}
}
Ok(())

View File

@@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
| FieldType::Bool(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::Ip(_)
| FieldType::Facet(_) => Box::new(SpecializedPostingsWriter::<NothingRecorder>::default()),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {

View File

@@ -89,6 +89,7 @@ pub(crate) fn serialize_postings(
| FieldType::Bool(_) => {}
FieldType::Bytes(_) => {}
FieldType::JsonObject(_) => {}
FieldType::Ip(_) => {} // TODO check
}
let postings_writer = per_field_postings_writers.get_for_field(field);

View File

@@ -400,6 +400,7 @@ impl QueryParser {
let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?;
Ok(Term::from_field_bytes(field, &bytes))
}
FieldType::Ip(_) => Ok(Term::from_field_text(field, phrase)),
}
}
@@ -506,6 +507,7 @@ impl QueryParser {
let bytes_term = Term::from_field_bytes(field, &bytes);
Ok(vec![LogicalLiteral::Term(bytes_term)])
}
FieldType::Ip(_) => Err(QueryParserError::FieldNotIndexed(field_name.to_string())),
}
}

View File

@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use super::ip_options::IpOptions;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::{
is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions,
@@ -60,6 +61,11 @@ impl FieldEntry {
Self::new(field_name, FieldType::Date(date_options))
}
/// Creates a new ip field entry.
pub fn new_ip(field_name: String, ip_options: IpOptions) -> FieldEntry {
Self::new(field_name, FieldType::Ip(ip_options))
}
/// Creates a field entry for a facet.
pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry {
Self::new(field_name, FieldType::Facet(facet_options))
@@ -114,6 +120,7 @@ impl FieldEntry {
FieldType::Facet(ref options) => options.is_stored(),
FieldType::Bytes(ref options) => options.is_stored(),
FieldType::JsonObject(ref options) => options.is_stored(),
FieldType::Ip(ref options) => options.is_stored(),
}
}
}

View File

@@ -1,8 +1,12 @@
use std::net::IpAddr;
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use super::Cardinality;
use super::ip_options::IpOptions;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::facet_options::FacetOptions;
use crate::schema::{
@@ -62,9 +66,13 @@ pub enum Type {
Bytes = b'b',
/// Leaf in a Json object.
Json = b'j',
/// IpAddr
Ip = b'p',
/// IpAddr
U128 = b'1',
}
const ALL_TYPES: [Type; 9] = [
const ALL_TYPES: [Type; 11] = [
Type::Str,
Type::U64,
Type::I64,
@@ -74,6 +82,8 @@ const ALL_TYPES: [Type; 9] = [
Type::Facet,
Type::Bytes,
Type::Json,
Type::Ip,
Type::U128,
];
impl Type {
@@ -100,6 +110,8 @@ impl Type {
Type::Facet => "Facet",
Type::Bytes => "Bytes",
Type::Json => "Json",
Type::Ip => "Ip",
Type::U128 => "U128",
}
}
@@ -116,6 +128,8 @@ impl Type {
b'h' => Some(Type::Facet),
b'b' => Some(Type::Bytes),
b'j' => Some(Type::Json),
b'p' => Some(Type::Ip),
b'1' => Some(Type::U128),
_ => None,
}
}
@@ -146,6 +160,8 @@ pub enum FieldType {
Bytes(BytesOptions),
/// Json object
JsonObject(JsonObjectOptions),
/// IpAddr field
Ip(IpOptions),
}
impl FieldType {
@@ -161,6 +177,7 @@ impl FieldType {
FieldType::Facet(_) => Type::Facet,
FieldType::Bytes(_) => Type::Bytes,
FieldType::JsonObject(_) => Type::Json,
FieldType::Ip(_) => Type::Ip,
}
}
@@ -176,6 +193,7 @@ impl FieldType {
FieldType::Facet(ref _facet_options) => true,
FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(),
FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(),
FieldType::Ip(_) => false,
}
}
@@ -210,6 +228,7 @@ impl FieldType {
| FieldType::F64(ref int_options)
| FieldType::Bool(ref int_options) => int_options.is_fast(),
FieldType::Date(ref date_options) => date_options.is_fast(),
FieldType::Ip(ref options) => options.is_fast(),
FieldType::Facet(_) => true,
FieldType::JsonObject(_) => false,
}
@@ -250,6 +269,7 @@ impl FieldType {
FieldType::Facet(_) => false,
FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(),
FieldType::JsonObject(ref _json_object_options) => false,
FieldType::Ip(_) => false,
}
}
@@ -294,6 +314,7 @@ impl FieldType {
FieldType::JsonObject(ref json_obj_options) => json_obj_options
.get_text_indexing_options()
.map(TextFieldIndexing::index_option),
FieldType::Ip(_) => None,
}
}
@@ -333,6 +354,14 @@ impl FieldType {
expected: "a json object",
json: JsonValue::String(field_text),
}),
FieldType::Ip(_) => {
Ok(Value::Ip(IpAddr::from_str(&field_text).map_err(|err| {
ValueParsingError::ParseError {
error: err.to_string(),
json: JsonValue::String(field_text),
}
})?))
}
}
}
JsonValue::Number(field_val_num) => match self {
@@ -380,6 +409,10 @@ impl FieldType {
expected: "a json object",
json: JsonValue::Number(field_val_num),
}),
FieldType::Ip(_) => Err(ValueParsingError::TypeError {
expected: "a string with an ip addr",
json: JsonValue::Number(field_val_num),
}),
},
JsonValue::Object(json_map) => match self {
FieldType::Str(_) => {

View File

@@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{self, Value as JsonValue};
use super::ip_options::IpOptions;
use super::*;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::field_type::ValueParsingError;
@@ -144,6 +145,28 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a ip field.
/// Returns the associated field handle
/// Internally, Tantivy simply stores ips as u64,
/// while the user supplies IpAddr values for convenience.
///
/// # Caution
///
/// Appending two fields with the same name
/// will result in the shadowing of the first
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_ip_field<T: Into<IpOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
) -> Field {
let field_name = String::from(field_name_str);
let field_entry = FieldEntry::new_ip(field_name, field_options.into());
self.add_field(field_entry)
}
/// Adds a new text field.
/// Returns the associated field handle
///

View File

@@ -415,6 +415,14 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re
debug_value_bytes(typ, bytes, f)?;
}
}
Type::Ip => {
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
Type::U128 => {
let s = as_str(bytes); // TODO: change when serialization changes
write_opt(f, s)?;
}
}
Ok(())
}

View File

@@ -1,4 +1,5 @@
use std::fmt;
use std::net::IpAddr;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -32,6 +33,8 @@ pub enum Value {
Bytes(Vec<u8>),
/// Json object value.
JsonObject(serde_json::Map<String, serde_json::Value>),
/// Ip
Ip(IpAddr),
}
impl Eq for Value {}
@@ -50,6 +53,7 @@ impl Serialize for Value {
Value::Facet(ref facet) => facet.serialize(serializer),
Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes),
Value::JsonObject(ref obj) => obj.serialize(serializer),
Value::Ip(ref obj) => obj.serialize(serializer), // TODO check serialization
}
}
}
@@ -201,6 +205,16 @@ impl Value {
None
}
}
/// Returns the ip addr, provided the value is of the `Ip` type.
/// (Returns None if the value is not of the `Ip` type)
pub fn as_ip(&self) -> Option<IpAddr> {
if let Value::Ip(val) = self {
Some(*val)
} else {
None
}
}
}
impl From<String> for Value {
@@ -209,6 +223,12 @@ impl From<String> for Value {
}
}
impl From<IpAddr> for Value {
fn from(v: IpAddr) -> Value {
Value::Ip(v)
}
}
impl From<u64> for Value {
fn from(v: u64) -> Value {
Value::U64(v)
@@ -287,7 +307,9 @@ impl From<serde_json::Value> for Value {
}
mod binary_serialize {
use std::io::{self, Read, Write};
use std::io::{self, ErrorKind, Read, Write};
use std::net::IpAddr;
use std::str::FromStr;
use common::{f64_to_u64, u64_to_f64, BinarySerializable};
@@ -306,6 +328,7 @@ mod binary_serialize {
const EXT_CODE: u8 = 7;
const JSON_OBJ_CODE: u8 = 8;
const BOOL_CODE: u8 = 9;
const IP_CODE: u8 = 10;
// extended types
@@ -366,6 +389,10 @@ mod binary_serialize {
serde_json::to_writer(writer, &map)?;
Ok(())
}
Value::Ip(ref ip) => {
IP_CODE.serialize(writer)?;
ip.to_string().serialize(writer) // TODO Check best format
}
}
}
@@ -418,7 +445,7 @@ mod binary_serialize {
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"No extended field type is associated with code {:?}",
"No extened field type is associated with code {:?}",
ext_type_code
),
)),
@@ -436,6 +463,13 @@ mod binary_serialize {
let json_map = <serde_json::Map::<String, serde_json::Value> as serde::Deserialize>::deserialize(&mut de)?;
Ok(Value::JsonObject(json_map))
}
IP_CODE => {
let text = String::deserialize(reader)?;
Ok(Value::Ip(IpAddr::from_str(&text).map_err(|err| {
io::Error::new(ErrorKind::Other, err.to_string())
})?))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("No field type is associated with code {:?}", type_code),