From 400a20b7af9b569892b38ec586f309198b0a4561 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 20 Sep 2022 16:17:34 +0800 Subject: [PATCH] add ip field add u128 multivalue reader and writer add ip to schema add ip writers, handle merge --- Cargo.toml | 1 + fastfield_codecs/benches/bench.rs | 2 +- fastfield_codecs/src/compact_space/mod.rs | 2 +- fastfield_codecs/src/lib.rs | 10 +- fastfield_codecs/src/main.rs | 2 +- src/fastfield/mod.rs | 5 +- src/fastfield/multivalued/mod.rs | 4 +- src/fastfield/multivalued/reader.rs | 151 +++++++++++++++- src/fastfield/multivalued/writer.rs | 142 ++++++++++++++- src/fastfield/readers.rs | 58 ++++++- src/fastfield/serializer/mod.rs | 5 + src/fastfield/writer.rs | 200 +++++++++++++++++++++- src/indexer/index_writer.rs | 134 +++++++++++++-- src/indexer/merger.rs | 161 ++++++++++++++++- src/indexer/segment_writer.rs | 7 + src/postings/per_field_postings_writer.rs | 1 + src/postings/postings_writer.rs | 1 + src/query/query_parser/query_parser.rs | 2 + src/schema/field_entry.rs | 7 + src/schema/field_type.rs | 35 +++- src/schema/schema.rs | 23 +++ src/schema/term.rs | 8 + src/schema/value.rs | 38 +++- 23 files changed, 966 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 330d96362..1bbe0220b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ measure_time = "0.8.2" ciborium = { version = "0.2", optional = true} async-trait = "0.1.53" arc-swap = "1.5.0" +roaring = "0.10.1" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 526036d4a..0bf46ae6e 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -102,7 +102,7 @@ mod tests { let mut out = vec![]; serialize_u128(VecColumn::from(&data), &mut out).unwrap(); let out = OwnedBytes::new(out); - open_u128(out).unwrap() + open_u128::(out).unwrap() } #[bench] diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index 389bccf6e..72283bb48 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -604,7 +604,7 @@ mod tests { ]; let mut out = Vec::new(); serialize_u128(VecColumn::from(vals), &mut out).unwrap(); - let decomp = open_u128(OwnedBytes::new(out)).unwrap(); + let decomp = open_u128::(OwnedBytes::new(out)).unwrap(); assert_eq!(decomp.get_between_vals(199..=200), vec![0]); assert_eq!(decomp.get_between_vals(199..=201), vec![0, 1]); diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 1f66a27e9..286564a86 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -22,6 +22,7 @@ mod compact_space; mod line; mod linear; mod monotonic_mapping; +mod monotonic_mapping_u128; mod column; mod gcd; @@ -32,6 +33,7 @@ use self::blockwise_linear::BlockwiseLinearCodec; pub use self::column::{monotonic_map_column, Column, VecColumn}; use self::linear::LinearCodec; pub use self::monotonic_mapping::MonotonicallyMappableToU64; +pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128; pub use self::serialize::{ estimate, serialize, serialize_and_load, serialize_u128, NormalizedHeader, }; @@ -73,8 +75,12 @@ impl FastFieldCodecType { } /// Returns the correct codec reader wrapped in the `Arc` for the data. -pub fn open_u128(bytes: OwnedBytes) -> io::Result>> { - Ok(Arc::new(CompactSpaceDecompressor::open(bytes)?)) +pub fn open_u128( + bytes: OwnedBytes, +) -> io::Result>> { + let monotonic_mapping = move |val: u128| Item::from_u128(val); + let reader = CompactSpaceDecompressor::open(bytes)?; + Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) } /// Returns the correct codec reader wrapped in the `Arc` for the data. diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index d3d9c06f8..7b963dc12 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -110,7 +110,7 @@ fn bench_ip() { (data.len() * 8) as f32 / dataset.len() as f32 ); - let decompressor = open_u128(OwnedBytes::new(data)).unwrap(); + let decompressor = open_u128::(OwnedBytes::new(data)).unwrap(); // Sample some ranges for value in dataset.iter().take(1110).skip(1100).cloned() { print_time!("get range"); diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 3fca75fce..c825ee85c 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -27,7 +27,10 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex}; -pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; +pub use self::multivalued::{ + MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, MultiValuedFastFieldWriter, + MultiValuedU128FastFieldReader, +}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; pub use self::serializer::{Column, CompositeFastFieldSerializer}; diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 26b49abd7..c625a2e76 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -3,9 +3,9 @@ mod writer; use fastfield_codecs::FastFieldCodecType; -pub use self::reader::MultiValuedFastFieldReader; -pub use self::writer::MultiValuedFastFieldWriter; +pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader}; pub(crate) use self::writer::MultivalueStartIndex; +pub use self::writer::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter}; /// The valid codecs for multivalue values excludes the linear interpolation codec. /// diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index f8e41f2e1..994c03c7e 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -1,7 +1,7 @@ -use std::ops::Range; +use std::ops::{Range, RangeInclusive}; use std::sync::Arc; -use fastfield_codecs::Column; +use fastfield_codecs::{Column, MonotonicallyMappableToU128}; use crate::fastfield::{FastValue, MultiValueLength}; use crate::DocId; @@ -99,6 +99,153 @@ impl MultiValueLength for MultiValuedFastFieldReader { self.total_num_vals() as u64 } } + +/// Reader for a multivalued `u128` fast field. +/// +/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field. +/// +/// The `vals_reader` will access the concatenated list of all +/// values for all reader. +/// The `idx_reader` associated, for each document, the index of its first value. +#[derive(Clone)] +pub struct MultiValuedU128FastFieldReader { + idx_reader: Arc>, + vals_reader: Arc>, +} + +impl MultiValuedU128FastFieldReader { + pub(crate) fn open( + idx_reader: Arc>, + vals_reader: Arc>, + ) -> MultiValuedU128FastFieldReader { + Self { + idx_reader, + vals_reader, + } + } + + /// Returns `[start, end)`, such that the values associated + /// to the given document are `start..end`. + #[inline] + fn range(&self, doc: DocId) -> Range { + let start = self.idx_reader.get_val(doc as u64); + let end = self.idx_reader.get_val(doc as u64 + 1); + start..end + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + pub fn get_first_val(&self, doc: DocId) -> Option { + let range = self.range(doc); + if range.is_empty() { + return None; + } + Some(self.vals_reader.get_val(range.start)) + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + fn get_vals_for_range(&self, range: Range, vals: &mut Vec) { + let len = (range.end - range.start) as usize; + vals.resize(len, T::from_u128(0)); + self.vals_reader.get_range(range.start, &mut vals[..]); + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + pub fn get_vals(&self, doc: DocId, vals: &mut Vec) { + let range = self.range(doc); + self.get_vals_for_range(range, vals); + } + + /// Returns all docids which are in the provided value range + pub fn get_between_vals(&self, range: RangeInclusive) -> Vec { + let positions = self.vals_reader.get_between_vals(range); + + positions_to_docids(&positions, self) + } + + /// Iterates over all elements in the fast field + pub fn iter(&self) -> impl Iterator + '_ { + self.vals_reader.iter() + } + + /// Returns the minimum value for this fast field. + /// + /// The min value does not take in account of possible + /// deleted document, and should be considered as a lower bound + /// of the actual mimimum value. + pub fn min_value(&self) -> T { + self.vals_reader.min_value() + } + + /// Returns the maximum value for this fast field. + /// + /// The max value does not take in account of possible + /// deleted document, and should be considered as an upper bound + /// of the actual maximum value. + pub fn max_value(&self) -> T { + self.vals_reader.max_value() + } + + /// Returns the number of values associated with the document `DocId`. + #[inline] + pub fn num_vals(&self, doc: DocId) -> usize { + let range = self.range(doc); + (range.end - range.start) as usize + } + + /// Returns the overall number of values in this field. + #[inline] + pub fn total_num_vals(&self) -> u64 { + self.idx_reader.max_value() + } +} + +impl MultiValueLength for MultiValuedU128FastFieldReader { + fn get_range(&self, doc_id: DocId) -> std::ops::Range { + self.range(doc_id) + } + fn get_len(&self, doc_id: DocId) -> u64 { + self.num_vals(doc_id) as u64 + } + fn get_total_len(&self) -> u64 { + self.total_num_vals() as u64 + } +} + +/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds. +/// +/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index. +/// +/// Correctness: positions needs to be sorted. +/// +/// TODO: Instead of a linear scan we can employ a binary search to match a docid to its value +/// position. +fn positions_to_docids(positions: &[u64], multival_idx: &T) -> Vec { + let mut docs = vec![]; + let mut cur_doc = 0u32; + let mut last_doc = None; + + for pos in positions { + loop { + let range = multival_idx.get_range(cur_doc); + if range.contains(pos) { + // avoid duplicates + if Some(cur_doc) == last_doc { + break; + } + docs.push(cur_doc); + last_doc = Some(cur_doc); + break; + } + cur_doc += 1; + } + } + + docs +} + #[cfg(test)] mod tests { diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 0fb30caf6..c5012911e 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,6 +1,8 @@ use std::io; -use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; +use fastfield_codecs::{ + serialize_u128, Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn, +}; use fnv::FnvHashMap; use super::get_fastfield_codecs_for_multivalue; @@ -264,6 +266,144 @@ fn iter_remapped_multivalue_index<'a, C: Column>( })) } +/// Writer for multi-valued (as in, more than one value per document) +/// int fast field. +/// +/// This `Writer` is only useful for advanced users. +/// The normal way to get your multivalued int in your index +/// is to +/// - declare your field with fast set to `Cardinality::MultiValues` +/// in your schema +/// - add your document simply by calling `.add_document(...)`. +/// +/// The `MultiValuedFastFieldWriter` can be acquired from the + +pub struct MultiValueU128FastFieldWriter { + field: Field, + vals: Vec, + doc_index: Vec, +} + +impl MultiValueU128FastFieldWriter { + /// Creates a new `U128MultiValueFastFieldWriter` + pub(crate) fn new(field: Field) -> Self { + MultiValueU128FastFieldWriter { + field, + vals: Vec::new(), + doc_index: Vec::new(), + } + } + + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.capacity() * std::mem::size_of::() + + self.doc_index.capacity() * std::mem::size_of::() + } + + /// Finalize the current document. + pub(crate) fn next_doc(&mut self) { + self.doc_index.push(self.vals.len() as u64); + } + + /// Pushes a new value to the current document. + pub(crate) fn add_val(&mut self, val: u128) { + self.vals.push(val); + } + + /// Shift to the next document and adds + /// all of the matching field values present in the document. + pub fn add_document(&mut self, doc: &Document) { + self.next_doc(); + for field_value in doc.field_values() { + if field_value.field == self.field { + let value = field_value.value(); + let ip_addr = value.as_ip().unwrap(); + let value = ip_addr.to_u128(); + self.add_val(value); + } + } + } + + /// Returns an iterator over values per doc_id in ascending doc_id order. + /// + /// Normally the order is simply iterating self.doc_id_index. + /// With doc_id_map it accounts for the new mapping, returning values in the order of the + /// new doc_ids. + fn get_ordered_values<'a: 'b, 'b>( + &'a self, + doc_id_map: Option<&'b DocIdMapping>, + ) -> impl Iterator { + get_ordered_values(&self.vals, &self.doc_index, doc_id_map) + } + + /// Serializes fast field values. + pub fn serialize( + mut self, + serializer: &mut CompositeFastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { + { + // writing the offset index + // + self.doc_index.push(self.vals.len() as u64); + let col = VecColumn::from(&self.doc_index[..]); + if let Some(doc_id_map) = doc_id_map { + let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map); + serializer.create_auto_detect_u64_fast_field_with_idx( + self.field, + multi_value_start_index, + 0, + )?; + } else { + serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?; + } + } + { + let field_write = serializer.get_field_writer(self.field, 1); + + let mut values = Vec::with_capacity(self.vals.len()); + for vals in self.get_ordered_values(doc_id_map) { + for &val in vals { + values.push(val); + } + } + let col = VecColumn::from(&values[..]); + + serialize_u128(col, field_write)?; + } + Ok(()) + } +} + +/// Returns an iterator over values per doc_id in ascending doc_id order. +/// +/// Normally the order is simply iterating self.doc_id_index. +/// With doc_id_map it accounts for the new mapping, returning values in the order of the +/// new doc_ids. +fn get_ordered_values<'a: 'b, 'b, T>( + vals: &'a [T], + doc_index: &'a [u64], + doc_id_map: Option<&'b DocIdMapping>, +) -> impl Iterator { + let doc_id_iter: Box> = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids()) + } else { + let max_doc = doc_index.len() as DocId; + Box::new(0..max_doc) + }; + doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index)) +} + +/// returns all values for a doc_id +fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] { + let start_pos = doc_index[doc_id as usize] as usize; + let end_pos = doc_index + .get(doc_id as usize + 1) + .cloned() + .unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information + &vals[start_pos..end_pos] +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 68f9a811f..e4dbbd858 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -1,7 +1,9 @@ +use std::net::IpAddr; use std::sync::Arc; -use fastfield_codecs::{open, Column}; +use fastfield_codecs::{open, open_u128, Column}; +use super::multivalued::MultiValuedU128FastFieldReader; use crate::directory::{CompositeFile, FileSlice}; use crate::fastfield::{ BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader, @@ -23,6 +25,7 @@ pub struct FastFieldReaders { pub(crate) enum FastType { I64, U64, + U128, F64, Bool, Date, @@ -49,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, FieldType::Str(options) if options.is_fast() => { Some((FastType::U64, Cardinality::MultiValues)) } + FieldType::Ip(options) => options + .get_fastfield_cardinality() + .map(|cardinality| (FastType::U128, cardinality)), _ => None, } } @@ -143,6 +149,56 @@ impl FastFieldReaders { self.typed_fast_field_reader(field) } + /// Returns the `ip` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub fn ip_addr(&self, field: Field) -> crate::Result>> { + self.check_type(field, FastType::U128, Cardinality::SingleValue)?; + let bytes = self.fast_field_data(field, 0)?.read_bytes()?; + Ok(open_u128::(bytes)?) + } + + /// Returns the `ip` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub fn ip_addrs(&self, field: Field) -> crate::Result> { + self.check_type(field, FastType::U128, Cardinality::MultiValues)?; + let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + + let bytes = self.fast_field_data(field, 1)?.read_bytes()?; + let vals_reader = open_u128::(bytes)?; + + Ok(MultiValuedU128FastFieldReader::open( + idx_reader, + vals_reader, + )) + } + + /// Returns the `u128` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub fn u128(&self, field: Field) -> crate::Result>> { + self.check_type(field, FastType::U128, Cardinality::SingleValue)?; + let bytes = self.fast_field_data(field, 0)?.read_bytes()?; + Ok(open_u128::(bytes)?) + } + + /// Returns the `u128` multi-valued fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 multi-valued fast field, this method returns an Error. + pub fn u128s(&self, field: Field) -> crate::Result> { + self.check_type(field, FastType::U128, Cardinality::MultiValues)?; + let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + + let bytes = self.fast_field_data(field, 1)?.read_bytes()?; + let vals_reader = open_u128::(bytes)?; + + Ok(MultiValuedU128FastFieldReader::open( + idx_reader, + vals_reader, + )) + } + /// Returns the `u64` fast field reader reader associated with `field`, regardless of whether /// the given field is effectively of type `u64` or not. /// diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 6ca292931..f58f28a12 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -93,6 +93,11 @@ impl CompositeFastFieldSerializer { self.composite_write.for_field_with_idx(field, 1) } + /// Gets the underlying writer + pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write { + self.composite_write.for_field_with_idx(field, idx) + } + /// Closes the serializer /// /// After this call the data must be persistently saved on disk. diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 5d1a0810e..972e0dde2 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -2,11 +2,14 @@ use std::collections::HashMap; use std::io; use common; -use fastfield_codecs::{Column, MonotonicallyMappableToU64}; +use fastfield_codecs::{ + serialize_u128, Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64, +}; use fnv::FnvHashMap; +use roaring::RoaringBitmap; use tantivy_bitpacker::BlockedBitpacker; -use super::multivalued::MultiValuedFastFieldWriter; +use super::multivalued::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter}; use super::FastFieldType; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::indexer::doc_id_mapping::DocIdMapping; @@ -19,6 +22,8 @@ use crate::DatePrecision; pub struct FastFieldsWriter { term_id_writers: Vec, single_value_writers: Vec, + u128_value_writers: Vec, + u128_multi_value_writers: Vec, multi_values_writers: Vec, bytes_value_writers: Vec, } @@ -34,6 +39,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 { impl FastFieldsWriter { /// Create all `FastFieldWriter` required by the schema. pub fn from_schema(schema: &Schema) -> FastFieldsWriter { + let mut u128_value_writers = Vec::new(); + let mut u128_multi_value_writers = Vec::new(); let mut single_value_writers = Vec::new(); let mut term_id_writers = Vec::new(); let mut multi_values_writers = Vec::new(); @@ -97,10 +104,27 @@ impl FastFieldsWriter { bytes_value_writers.push(fast_field_writer); } } + FieldType::Ip(opt) => { + if opt.is_fast() { + match opt.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + let fast_field_writer = U128FastFieldWriter::new(field); + u128_value_writers.push(fast_field_writer); + } + Some(Cardinality::MultiValues) => { + let fast_field_writer = MultiValueU128FastFieldWriter::new(field); + u128_multi_value_writers.push(fast_field_writer); + } + None => {} + } + } + } FieldType::Str(_) | FieldType::JsonObject(_) => {} } } FastFieldsWriter { + u128_value_writers, + u128_multi_value_writers, term_id_writers, single_value_writers, multi_values_writers, @@ -129,6 +153,16 @@ impl FastFieldsWriter { .iter() .map(|w| w.mem_usage()) .sum::() + + self + .u128_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() + + self + .u128_multi_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() } /// Get the `FastFieldWriter` associated with a field. @@ -190,7 +224,6 @@ impl FastFieldsWriter { .iter_mut() .find(|field_writer| field_writer.field() == field) } - /// Indexes all of the fastfields of a new document. pub fn add_document(&mut self, doc: &Document) { for field_writer in &mut self.term_id_writers { @@ -205,6 +238,12 @@ impl FastFieldsWriter { for field_writer in &mut self.bytes_value_writers { field_writer.add_document(doc); } + for field_writer in &mut self.u128_value_writers { + field_writer.add_document(doc); + } + for field_writer in &mut self.u128_multi_value_writers { + field_writer.add_document(doc); + } } /// Serializes all of the `FastFieldWriter`s by pushing them in @@ -230,6 +269,161 @@ impl FastFieldsWriter { for field_writer in self.bytes_value_writers { field_writer.serialize(serializer, doc_id_map)?; } + for field_writer in self.u128_value_writers { + field_writer.serialize(serializer, doc_id_map)?; + } + for field_writer in self.u128_multi_value_writers { + field_writer.serialize(serializer, doc_id_map)?; + } + + Ok(()) + } +} + +/// Fast field writer for u128 values. +/// The fast field writer just keeps the values in memory. +/// +/// Only when the segment writer can be closed and +/// persisted on disc, the fast field writer is +/// sent to a `FastFieldSerializer` via the `.serialize(...)` +/// method. +/// +/// We cannot serialize earlier as the values are +/// compressed to a compact number space and the number of +/// bits required for bitpacking can only been known once +/// we have seen all of the values. +pub struct U128FastFieldWriter { + field: Field, + vals: Vec, + val_count: u32, + + null_values: RoaringBitmap, +} + +impl U128FastFieldWriter { + /// Creates a new `IntFastFieldWriter` + pub fn new(field: Field) -> Self { + Self { + field, + vals: vec![], + val_count: 0, + null_values: RoaringBitmap::new(), + } + } + + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.len() * 16 + } + + /// Records a new value. + /// + /// The n-th value being recorded is implicitely + /// associated to the document with the `DocId` n. + /// (Well, `n-1` actually because of 0-indexing) + pub fn add_val(&mut self, val: u128) { + self.vals.push(val); + } + + /// Extract the fast field value from the document + /// (or use the default value) and records it. + /// + /// Extract the value associated to the fast field for + /// this document. + pub fn add_document(&mut self, doc: &Document) { + match doc.get_first(self.field) { + Some(v) => { + let ip_addr = v.as_ip().unwrap(); + let value = ip_addr.to_u128(); + self.add_val(value); + } + None => { + self.null_values.insert(self.val_count as u32); + } + }; + self.val_count += 1; + } + + /// Push the fast fields value to the `FastFieldWriter`. + pub fn serialize( + &self, + serializer: &mut CompositeFastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { + // To get the actual value, we could materialize the vec with u128 including nulls, but + // that could cost a lot of memory. Instead we just compute the index for of + // the values + let mut idx_to_val_idx = vec![]; + idx_to_val_idx.resize(self.val_count as usize, 0); + + let mut val_idx = 0; + for idx in 0..self.val_count { + if !self.null_values.contains(idx as u32) { + idx_to_val_idx[idx as usize] = val_idx as u32; + val_idx += 1; + } + } + + struct RemappedFFWriter<'a> { + doc_id_map: Option<&'a DocIdMapping>, + null_values: &'a RoaringBitmap, + vals: &'a [u128], + idx_to_val_idx: Vec, + val_count: u32, + } + impl<'a> Column for RemappedFFWriter<'a> { + fn get_val(&self, _idx: u64) -> u128 { + // unused by codec + unreachable!() + } + + fn min_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn max_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn num_vals(&self) -> u64 { + self.val_count as u64 + } + fn iter(&self) -> Box + '_> { + if let Some(doc_id_map) = self.doc_id_map { + let iter = doc_id_map.iter_old_doc_ids().map(|idx| { + if self.null_values.contains(idx as u32) { + 0 // TODO properly handle nulls + } else { + self.vals[self.idx_to_val_idx[idx as usize] as usize] + } + }); + Box::new(iter) + } else { + let iter = (0..self.val_count).map(|idx| { + if self.null_values.contains(idx as u32) { + 0 // TODO properly handle nulls + } else { + self.vals[self.idx_to_val_idx[idx as usize] as usize] + } + }); + Box::new(iter) + } + } + } + + let column = RemappedFFWriter { + doc_id_map, + null_values: &self.null_values, + vals: &self.vals, + idx_to_val_idx, + val_count: self.val_count, + }; + + let field_write = serializer.get_field_writer(self.field, 0); + serialize_u128(column, field_write)?; + Ok(()) } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 3caa0f4aa..9b3b6bfc9 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -803,7 +803,9 @@ impl Drop for IndexWriter { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::net::IpAddr; + use fastfield_codecs::MonotonicallyMappableToU128; use proptest::prelude::*; use proptest::prop_oneof; use proptest::strategy::Strategy; @@ -815,7 +817,7 @@ mod tests { use crate::indexer::NoMergePolicy; use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery}; use crate::schema::{ - self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, + self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpOptions, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, }; use crate::store::DOCSTORE_CACHE_CAPACITY; @@ -1593,6 +1595,11 @@ mod tests { force_end_merge: bool, ) -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); + let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED); + let ips_field = schema_builder.add_ip_field( + "ips", + IpOptions::default().set_fast(Cardinality::MultiValues), + ); let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED); let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED); let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED); @@ -1648,17 +1655,37 @@ mod tests { match op { IndexingOp::AddDoc { id } => { let facet = Facet::from(&("/cola/".to_string() + &id.to_string())); - index_writer.add_document(doc!(id_field=>id, - bytes_field => id.to_le_bytes().as_slice(), - multi_numbers=> id, - multi_numbers => id, - bool_field => (id % 2u64) != 0, - multi_bools => (id % 2u64) != 0, - multi_bools => (id % 2u64) == 0, - text_field => id.to_string(), - facet_field => facet, - large_text_field=> LOREM - ))?; + let ip_from_id = IpAddr::from_u128(id as u128); + + if id % 3 == 0 { + // every 3rd doc has no ip field + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } else { + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + ip_field => ip_from_id, + ips_field => ip_from_id, + ips_field => ip_from_id, + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } } IndexingOp::DeleteDoc { id } => { index_writer.delete_term(Term::from_field_u64(id_field, id)); @@ -1744,6 +1771,59 @@ mod tests { .collect::>() ); + // Load all ips addr + let ips: HashSet = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap(); + segment_reader.doc_ids_alive().flat_map(move |doc| { + let val = ff_reader.get_val(doc as u64); + if val == IpAddr::from_u128(0) { + None + } else { + Some(val) + } + }) + }) + .collect(); + + let expected_ips = expected_ids_and_num_occurrences + .keys() + .flat_map(|id| { + if id % 3 == 0 { + None + } else { + Some(IpAddr::from_u128(*id as u128)) + } + }) + .collect::>(); + assert_eq!(ips, expected_ips); + + let expected_ips = expected_ids_and_num_occurrences + .keys() + .filter_map(|id| { + if id % 3 == 0 { + None + } else { + Some(IpAddr::from_u128(*id as u128)) + } + }) + .collect::>(); + let ips: HashSet = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap(); + segment_reader.doc_ids_alive().flat_map(move |doc| { + let mut vals = vec![]; + ff_reader.get_vals(doc, &mut vals); + vals.into_iter().filter(|val| val.to_u128() != 0) + }) + }) + .collect(); + assert_eq!(ips, expected_ips); + // multivalue fast field tests for segment_reader in searcher.segment_readers().iter() { let id_reader = segment_reader.fast_fields().u64(id_field).unwrap(); @@ -1847,6 +1927,36 @@ mod tests { Ok(()) } + #[test] + fn test_minimal() { + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + true, + false + ) + .is_ok()); + + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + false, + false + ) + .is_ok()); + } + + #[test] + fn test_minimal_sort_merge() { + assert!(test_operation_strategy(&[IndexingOp::AddDoc { id: 3 },], true, true).is_ok()); + } + proptest! { #![proptest_config(ProptestConfig::with_cases(20))] #[test] diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0ed47a915..84762091f 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::Write; use std::sync::Arc; -use fastfield_codecs::VecColumn; +use fastfield_codecs::{serialize_u128, VecColumn}; use itertools::Itertools; use measure_time::debug_time; @@ -11,8 +11,8 @@ use crate::core::{Segment, SegmentReader}; use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::{ - get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, - MultiValueLength, MultiValuedFastFieldReader, + get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, MultiValueLength, + MultiValuedFastFieldReader, MultiValuedU128FastFieldReader, }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; @@ -295,6 +295,24 @@ impl IndexMerger { self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?; } } + FieldType::Ip(options) => match options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + self.write_u128_single_fast_field( + field, + fast_field_serializer, + doc_id_mapping, + )?; + } + Some(Cardinality::MultiValues) => { + self.write_u128_multi_fast_field( + field, + fast_field_serializer, + doc_id_mapping, + )?; + } + None => {} + }, + FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => { // We don't handle json fast field for the moment // They can be implemented using what is done @@ -305,6 +323,143 @@ impl IndexMerger { Ok(()) } + // used to merge `u128` single fast fields. + fn write_u128_multi_fast_field( + &self, + field: Field, + fast_field_serializer: &mut CompositeFastFieldSerializer, + doc_id_mapping: &SegmentDocIdMapping, + ) -> crate::Result<()> { + let segment_and_ff_readers = self + .readers + .iter() + .map(|segment_reader| { + let ff_reader: MultiValuedU128FastFieldReader = + segment_reader.fast_fields().u128s(field).expect( + "Failed to find index for multivalued field. This is a bug in tantivy, \ + please report.", + ); + (segment_reader, ff_reader) + }) + .collect::>(); + + Self::write_1_n_fast_field_idx_generic( + field, + fast_field_serializer, + doc_id_mapping, + &segment_and_ff_readers, + )?; + + let fast_field_readers = segment_and_ff_readers + .into_iter() + .map(|(_, ff_reader)| ff_reader) + .collect::>(); + + struct RemappedFFReader<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: Vec>, + } + impl<'a> Column for RemappedFFReader<'a> { + fn get_val(&self, _idx: u64) -> u128 { + // unused by codec + unreachable!() + } + + fn min_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn max_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn num_vals(&self) -> u64 { + self.doc_id_mapping.len() as u64 + } + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new( + self.doc_id_mapping + .iter_old_doc_addrs() + .flat_map(|doc_addr| { + let fast_field_reader = + &self.fast_field_readers[doc_addr.segment_ord as usize]; + let mut out = vec![]; + fast_field_reader.get_vals(doc_addr.doc_id, &mut out); + out.into_iter() + }), + ) + } + } + let column = RemappedFFReader { + doc_id_mapping, + fast_field_readers, + }; + let field_write = fast_field_serializer.get_field_writer(field, 1); + serialize_u128(column, field_write)?; + + Ok(()) + } + + // used to merge `u128` single fast fields. + fn write_u128_single_fast_field( + &self, + field: Field, + fast_field_serializer: &mut CompositeFastFieldSerializer, + doc_id_mapping: &SegmentDocIdMapping, + ) -> crate::Result<()> { + let fast_field_readers = self + .readers + .iter() + .map(|reader| { + let u128_reader: Arc> = reader.fast_fields().u128(field).expect( + "Failed to find a reader for single fast field. This is a tantivy bug and it \ + should never happen.", + ); + u128_reader + }) + .collect::>(); + + struct RemappedFFReader<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: Vec>>, + } + impl<'a> Column for RemappedFFReader<'a> { + fn get_val(&self, _idx: u64) -> u128 { + // unused by codec + unreachable!() + } + + fn min_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn max_value(&self) -> u128 { + // unused by codec + unreachable!() + } + + fn num_vals(&self) -> u64 { + self.doc_id_mapping.len() as u64 + } + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new(self.doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { + let fast_field_reader = &self.fast_field_readers[doc_addr.segment_ord as usize]; + fast_field_reader.get_val(doc_addr.doc_id as u64) + })) + } + } + let column = RemappedFFReader { + doc_id_mapping, + fast_field_readers, + }; + let field_write = fast_field_serializer.get_field_writer(field, 0); + serialize_u128(column, field_write)?; + Ok(()) + } + // used both to merge field norms, `u64/i64` single fast fields. fn write_single_fast_field( &self, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 415378752..36348518e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -294,6 +294,13 @@ impl SegmentWriter { ctx, )?; } + FieldType::Ip(_) => { + for value in values { + let ip_val = value.as_ip().ok_or_else(make_schema_error)?; + term_buffer.set_text(&ip_val.to_string()); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); + } + } } } Ok(()) diff --git a/src/postings/per_field_postings_writer.rs b/src/postings/per_field_postings_writer.rs index 61d02752f..d0ca89b11 100644 --- a/src/postings/per_field_postings_writer.rs +++ b/src/postings/per_field_postings_writer.rs @@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box Box::new(SpecializedPostingsWriter::::default()), FieldType::JsonObject(ref json_object_options) => { if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() { diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 84c95739e..ed2bd2434 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -89,6 +89,7 @@ pub(crate) fn serialize_postings( | FieldType::Bool(_) => {} FieldType::Bytes(_) => {} FieldType::JsonObject(_) => {} + FieldType::Ip(_) => {} // TODO check } let postings_writer = per_field_postings_writers.get_for_field(field); diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index f9e032f7a..497bb80e6 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -400,6 +400,7 @@ impl QueryParser { let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?; Ok(Term::from_field_bytes(field, &bytes)) } + FieldType::Ip(_) => Ok(Term::from_field_text(field, phrase)), } } @@ -506,6 +507,7 @@ impl QueryParser { let bytes_term = Term::from_field_bytes(field, &bytes); Ok(vec![LogicalLiteral::Term(bytes_term)]) } + FieldType::Ip(_) => Err(QueryParserError::FieldNotIndexed(field_name.to_string())), } } diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index 997fbd256..e3c23687e 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +use super::ip_options::IpOptions; use crate::schema::bytes_options::BytesOptions; use crate::schema::{ is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions, @@ -60,6 +61,11 @@ impl FieldEntry { Self::new(field_name, FieldType::Date(date_options)) } + /// Creates a new ip field entry. + pub fn new_ip(field_name: String, ip_options: IpOptions) -> FieldEntry { + Self::new(field_name, FieldType::Ip(ip_options)) + } + /// Creates a field entry for a facet. pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry { Self::new(field_name, FieldType::Facet(facet_options)) @@ -114,6 +120,7 @@ impl FieldEntry { FieldType::Facet(ref options) => options.is_stored(), FieldType::Bytes(ref options) => options.is_stored(), FieldType::JsonObject(ref options) => options.is_stored(), + FieldType::Ip(ref options) => options.is_stored(), } } } diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 3a631697e..9c51f9a6f 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -1,8 +1,12 @@ +use std::net::IpAddr; +use std::str::FromStr; + use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use thiserror::Error; use super::Cardinality; +use super::ip_options::IpOptions; use crate::schema::bytes_options::BytesOptions; use crate::schema::facet_options::FacetOptions; use crate::schema::{ @@ -62,9 +66,13 @@ pub enum Type { Bytes = b'b', /// Leaf in a Json object. Json = b'j', + /// IpAddr + Ip = b'p', + /// IpAddr + U128 = b'1', } -const ALL_TYPES: [Type; 9] = [ +const ALL_TYPES: [Type; 11] = [ Type::Str, Type::U64, Type::I64, @@ -74,6 +82,8 @@ const ALL_TYPES: [Type; 9] = [ Type::Facet, Type::Bytes, Type::Json, + Type::Ip, + Type::U128, ]; impl Type { @@ -100,6 +110,8 @@ impl Type { Type::Facet => "Facet", Type::Bytes => "Bytes", Type::Json => "Json", + Type::Ip => "Ip", + Type::U128 => "U128", } } @@ -116,6 +128,8 @@ impl Type { b'h' => Some(Type::Facet), b'b' => Some(Type::Bytes), b'j' => Some(Type::Json), + b'p' => Some(Type::Ip), + b'1' => Some(Type::U128), _ => None, } } @@ -146,6 +160,8 @@ pub enum FieldType { Bytes(BytesOptions), /// Json object JsonObject(JsonObjectOptions), + /// IpAddr field + Ip(IpOptions), } impl FieldType { @@ -161,6 +177,7 @@ impl FieldType { FieldType::Facet(_) => Type::Facet, FieldType::Bytes(_) => Type::Bytes, FieldType::JsonObject(_) => Type::Json, + FieldType::Ip(_) => Type::Ip, } } @@ -176,6 +193,7 @@ impl FieldType { FieldType::Facet(ref _facet_options) => true, FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(), FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(), + FieldType::Ip(_) => false, } } @@ -210,6 +228,7 @@ impl FieldType { | FieldType::F64(ref int_options) | FieldType::Bool(ref int_options) => int_options.is_fast(), FieldType::Date(ref date_options) => date_options.is_fast(), + FieldType::Ip(ref options) => options.is_fast(), FieldType::Facet(_) => true, FieldType::JsonObject(_) => false, } @@ -250,6 +269,7 @@ impl FieldType { FieldType::Facet(_) => false, FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(), FieldType::JsonObject(ref _json_object_options) => false, + FieldType::Ip(_) => false, } } @@ -294,6 +314,7 @@ impl FieldType { FieldType::JsonObject(ref json_obj_options) => json_obj_options .get_text_indexing_options() .map(TextFieldIndexing::index_option), + FieldType::Ip(_) => None, } } @@ -333,6 +354,14 @@ impl FieldType { expected: "a json object", json: JsonValue::String(field_text), }), + FieldType::Ip(_) => { + Ok(Value::Ip(IpAddr::from_str(&field_text).map_err(|err| { + ValueParsingError::ParseError { + error: err.to_string(), + json: JsonValue::String(field_text), + } + })?)) + } } } JsonValue::Number(field_val_num) => match self { @@ -380,6 +409,10 @@ impl FieldType { expected: "a json object", json: JsonValue::Number(field_val_num), }), + FieldType::Ip(_) => Err(ValueParsingError::TypeError { + expected: "a string with an ip addr", + json: JsonValue::Number(field_val_num), + }), }, JsonValue::Object(json_map) => match self { FieldType::Str(_) => { diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 783ce11fe..e85c9d5ae 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -7,6 +7,7 @@ use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{self, Value as JsonValue}; +use super::ip_options::IpOptions; use super::*; use crate::schema::bytes_options::BytesOptions; use crate::schema::field_type::ValueParsingError; @@ -144,6 +145,28 @@ impl SchemaBuilder { self.add_field(field_entry) } + /// Adds a ip field. + /// Returns the associated field handle + /// Internally, Tantivy simply stores ips as u64, + /// while the user supplies IpAddr values for convenience. + /// + /// # Caution + /// + /// Appending two fields with the same name + /// will result in the shadowing of the first + /// by the second one. + /// The first field will get a field id + /// but only the second one will be indexed + pub fn add_ip_field>( + &mut self, + field_name_str: &str, + field_options: T, + ) -> Field { + let field_name = String::from(field_name_str); + let field_entry = FieldEntry::new_ip(field_name, field_options.into()); + self.add_field(field_entry) + } + /// Adds a new text field. /// Returns the associated field handle /// diff --git a/src/schema/term.rs b/src/schema/term.rs index 99f3e5ed5..79546d1dd 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -415,6 +415,14 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re debug_value_bytes(typ, bytes, f)?; } } + Type::Ip => { + let s = as_str(bytes); // TODO: change when serialization changes + write_opt(f, s)?; + } + Type::U128 => { + let s = as_str(bytes); // TODO: change when serialization changes + write_opt(f, s)?; + } } Ok(()) } diff --git a/src/schema/value.rs b/src/schema/value.rs index bcfcfb74b..55cdc4dd7 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::net::IpAddr; use serde::de::Visitor; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -32,6 +33,8 @@ pub enum Value { Bytes(Vec), /// Json object value. JsonObject(serde_json::Map), + /// Ip + Ip(IpAddr), } impl Eq for Value {} @@ -50,6 +53,7 @@ impl Serialize for Value { Value::Facet(ref facet) => facet.serialize(serializer), Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes), Value::JsonObject(ref obj) => obj.serialize(serializer), + Value::Ip(ref obj) => obj.serialize(serializer), // TODO check serialization } } } @@ -201,6 +205,16 @@ impl Value { None } } + + /// Returns the ip addr, provided the value is of the `Ip` type. + /// (Returns None if the value is not of the `Ip` type) + pub fn as_ip(&self) -> Option { + if let Value::Ip(val) = self { + Some(*val) + } else { + None + } + } } impl From for Value { @@ -209,6 +223,12 @@ impl From for Value { } } +impl From for Value { + fn from(v: IpAddr) -> Value { + Value::Ip(v) + } +} + impl From for Value { fn from(v: u64) -> Value { Value::U64(v) @@ -287,7 +307,9 @@ impl From for Value { } mod binary_serialize { - use std::io::{self, Read, Write}; + use std::io::{self, ErrorKind, Read, Write}; + use std::net::IpAddr; + use std::str::FromStr; use common::{f64_to_u64, u64_to_f64, BinarySerializable}; @@ -306,6 +328,7 @@ mod binary_serialize { const EXT_CODE: u8 = 7; const JSON_OBJ_CODE: u8 = 8; const BOOL_CODE: u8 = 9; + const IP_CODE: u8 = 10; // extended types @@ -366,6 +389,10 @@ mod binary_serialize { serde_json::to_writer(writer, &map)?; Ok(()) } + Value::Ip(ref ip) => { + IP_CODE.serialize(writer)?; + ip.to_string().serialize(writer) // TODO Check best format + } } } @@ -418,7 +445,7 @@ mod binary_serialize { _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!( - "No extended field type is associated with code {:?}", + "No extened field type is associated with code {:?}", ext_type_code ), )), @@ -436,6 +463,13 @@ mod binary_serialize { let json_map = as serde::Deserialize>::deserialize(&mut de)?; Ok(Value::JsonObject(json_map)) } + IP_CODE => { + let text = String::deserialize(reader)?; + Ok(Value::Ip(IpAddr::from_str(&text).map_err(|err| { + io::Error::new(ErrorKind::Other, err.to_string()) + })?)) + } + _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("No field type is associated with code {:?}", type_code),