diff --git a/fastfield_codecs/src/ip_codec.rs b/fastfield_codecs/src/ip_codec.rs index 862230cdc..f964a76c6 100644 --- a/fastfield_codecs/src/ip_codec.rs +++ b/fastfield_codecs/src/ip_codec.rs @@ -126,11 +126,12 @@ fn get_deltas(ip_addrs_sorted: &[u128]) -> BinaryHeap { deltas } -/// Will find blanks if it will affect the number of bits used on the compact space. -/// Returns the new amplitude and the positions of blanks +/// Will collect blanks and add them to compact space if it will affect the number of bits used on +/// the compact space. fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> CompactSpace { + let max_val = *ip_addrs_sorted.last().unwrap_or(&0u128) + 1; let mut deltas = get_deltas(ip_addrs_sorted); - let mut amplitude_compact_space = *ip_addrs_sorted.last().unwrap() + 1; + let mut amplitude_compact_space = max_val; let mut amplitude_bits: u8 = (amplitude_compact_space as f64).log2().ceil() as u8; let mut staged_blanks = vec![]; @@ -172,13 +173,13 @@ fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> Comp } } } - compact_space.add_hole(*ip_addrs_sorted.last().unwrap() + 1..=u128::MAX); + compact_space.add_hole(max_val..=u128::MAX); compact_space.finish() } #[test] -fn get_blanks_test() { +fn compact_space_test() { // small ranges are ignored here let ips = vec![ 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, @@ -205,6 +206,7 @@ impl CompactSpaceBuilder { } } + // Will extend the first range and add a null value to it. fn assign_and_return_null(&mut self) -> u128 { self.covered_space[0] = *self.covered_space[0].start()..=*self.covered_space[0].end() + 1; *self.covered_space[0].end() @@ -380,8 +382,8 @@ pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor { ); let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); - let min_value = ip_addrs_sorted[0]; - let max_value = ip_addrs_sorted[ip_addrs_sorted.len() - 1]; + let min_value = *ip_addrs_sorted.first().unwrap_or(&0); + let max_value = *ip_addrs_sorted.last().unwrap_or(&0); let compressor = IntervalCompressor { null_value, min_value, @@ -390,7 +392,7 @@ pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor { num_bits, }; - let max_value = *ip_addrs_sorted.last().unwrap().max(&null_value); + let max_value = *ip_addrs_sorted.last().unwrap_or(&0u128).max(&null_value); assert_eq!( compressor.to_compact(max_value) + 1, amplitude_compact_space as u64 @@ -415,7 +417,7 @@ impl IntervalCompressor { fn write_footer(&self, write: &mut impl Write, num_vals: u128) -> io::Result<()> { let mut footer = vec![]; - // header flags to for future optional dictionary encoding + // header flags for future optional dictionary encoding let header_flags = 0u64; footer.extend_from_slice(&header_flags.to_le_bytes()); @@ -632,9 +634,6 @@ impl IntervalEncoding { // TODO move to test pub fn encode(&self, vals: &[u128]) -> Vec { - if vals.is_empty() { - return Vec::new(); - } let compressor = self.train(vals.to_vec()); compressor.compress(vals).unwrap() } @@ -701,6 +700,14 @@ mod tests { ); } + #[test] + fn test_empty() { + let vals = &[]; + let interval_encoding = IntervalEncoding::default(); + let data = test_aux_vals(&interval_encoding, vals); + let _decomp = IntervallDecompressor::open(&data).unwrap(); + } + #[test] fn test_range_2() { let vals = &[ diff --git a/src/directory/composite_file.rs b/src/directory/composite_file.rs index 7743620e1..ccd697295 100644 --- a/src/directory/composite_file.rs +++ b/src/directory/composite_file.rs @@ -38,7 +38,7 @@ impl BinarySerializable for FileAddr { /// A `CompositeWrite` is used to write a `CompositeFile`. pub struct CompositeWrite { write: CountingWriter, - offsets: HashMap, + offsets: Vec<(FileAddr, u64)>, } impl CompositeWrite { @@ -47,7 +47,7 @@ impl CompositeWrite { pub fn wrap(w: W) -> CompositeWrite { CompositeWrite { write: CountingWriter::wrap(w), - offsets: HashMap::new(), + offsets: Vec::new(), } } @@ -60,8 +60,8 @@ impl CompositeWrite { pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter { let offset = self.write.written_bytes(); let file_addr = FileAddr::new(field, idx); - assert!(!self.offsets.contains_key(&file_addr)); - self.offsets.insert(file_addr, offset); + assert!(!self.offsets.iter().any(|el| el.0 == file_addr)); + self.offsets.push((file_addr, offset)); &mut self.write } @@ -73,16 +73,8 @@ impl CompositeWrite { let footer_offset = self.write.written_bytes(); VInt(self.offsets.len() as u64).serialize(&mut self.write)?; - let mut offset_fields: Vec<_> = self - .offsets - .iter() - .map(|(file_addr, offset)| (*offset, *file_addr)) - .collect(); - - offset_fields.sort(); - let mut prev_offset = 0; - for (offset, file_addr) in offset_fields { + for (file_addr, offset) in self.offsets { VInt((offset - prev_offset) as u64).serialize(&mut self.write)?; file_addr.serialize(&mut self.write)?; prev_offset = offset; @@ -106,6 +98,14 @@ pub struct CompositeFile { offsets_index: HashMap>, } +impl std::fmt::Debug for CompositeFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompositeFile") + .field("offsets_index", &self.offsets_index) + .finish() + } +} + impl CompositeFile { /// Opens a composite file stored in a given /// `FileSlice`. diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index 2a98c1c08..3d3785984 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -129,6 +129,7 @@ impl BytesFastFieldWriter { } else { value_serializer.write_all(&self.vals)?; } + value_serializer.flush()?; Ok(()) } } diff --git a/src/fastfield/fast_value.rs b/src/fastfield/fast_value.rs index 980ba8b53..f748c1be9 100644 --- a/src/fastfield/fast_value.rs +++ b/src/fastfield/fast_value.rs @@ -51,7 +51,7 @@ impl FastValueU128 for u128 { impl FastValueU128 for IpAddr { fn from_u128(val: u128) -> Self { - IpAddr::from(val.to_le_bytes()) + IpAddr::from(val.to_be_bytes()) } fn to_u128(&self) -> u128 { diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 31d762c73..5f9bb418b 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -103,6 +103,7 @@ impl FastFieldType { mod tests { use std::collections::HashMap; + use std::net::IpAddr; use std::ops::Range; use std::path::Path; @@ -115,7 +116,9 @@ mod tests { use super::*; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; use crate::merge_policy::NoMergePolicy; - use crate::schema::{Cardinality, Document, Field, Schema, FAST, STRING, TEXT}; + use crate::schema::{ + self, Cardinality, Document, Field, Schema, FAST, INDEXED, STORED, STRING, TEXT, + }; use crate::time::OffsetDateTime; use crate::{DateOptions, DatePrecision, DateTime, Index, SegmentId, SegmentReader}; @@ -143,7 +146,7 @@ mod tests { } #[test] - pub fn test_fastfield_i64_u64() { + pub fn test_datetime_conversion() { let datetime = DateTime::from_utc(OffsetDateTime::UNIX_EPOCH); assert_eq!(i64::from_u64(datetime.to_u64()), 0i64); } @@ -452,6 +455,38 @@ mod tests { all } + #[test] + fn test_ip_fastfield_minimal() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + index_writer.add_document(doc!())?; + index_writer.add_document(doc!( + ip_field => IpAddr::from((2_u128).to_le_bytes()) + ))?; + index_writer.commit()?; + + let reader = index.reader()?; + let searcher = reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_reader(0); + let fast_fields = segment_reader.fast_fields(); + let ip_addr_fast_field = fast_fields.ip_addr(ip_field).unwrap(); + + assert_eq!(ip_addr_fast_field.get(0), None); + assert_eq!( + ip_addr_fast_field.get(1), + Some(IpAddr::from((2_u128).to_le_bytes())) + ); + + Ok(()) + } + #[test] fn test_text_fastfield() -> crate::Result<()> { let mut schema_builder = Schema::builder(); diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index de5a1b00a..0a8c9ee96 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -233,9 +233,9 @@ impl FastFieldReaderCodecWrapp } /// Returns the item for the docid - pub fn get(&self, doc: u64) -> Option { + pub fn get(&self, doc: DocId) -> Option { self.reader - .get(doc, self.bytes.as_slice()) + .get(doc as u64, self.bytes.as_slice()) .map(|el| Item::from_u128(el)) } diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 4636261a8..db61ec238 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -24,6 +24,7 @@ pub struct FastFieldReaders { pub(crate) enum FastType { I64, U64, + U128, F64, Bool, Date, @@ -50,6 +51,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, FieldType::Str(options) if options.is_fast() => { Some((FastType::U64, Cardinality::MultiValues)) } + FieldType::Ip(options) if options.is_fast() => { + Some((FastType::U128, Cardinality::SingleValue)) + } _ => None, } } @@ -148,6 +152,7 @@ impl FastFieldReaders { &self, field: Field, ) -> crate::Result> { + self.check_type(field, FastType::U128, Cardinality::SingleValue)?; let fast_field_slice = self.fast_field_data(field, 0)?; let bytes = fast_field_slice.read_bytes()?; FastFieldReaderCodecWrapperU128::::open_from_bytes(bytes) diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index c4c550b7d..de3cc2dbc 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -327,9 +327,7 @@ impl CompositeFastFieldSerializer { FastBytesFieldSerializer { write: field_write } } - /// Closes the serializer - /// - /// After this call the data must be persistently saved on disk. + /// Gets the underlying writer pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write { self.composite_write.for_field_with_idx(field, idx) } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 5612526a7..a0080a943 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -326,7 +326,7 @@ impl U128FastFieldWriter { serializer: &mut CompositeFastFieldSerializer, doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { - let field_write = serializer.get_field_writer(self.field, 0); + let mut field_write = serializer.get_field_writer(self.field, 0); let compressor = IntervalCompressor::from_vals(self.vals.to_vec()); let mut val_idx = 0; @@ -341,11 +341,31 @@ impl U128FastFieldWriter { }; if let Some(doc_id_map) = doc_id_map { - let iter = doc_id_map.iter_old_doc_ids().map(&mut get_val); - compressor.compress_into(iter, field_write)?; + // To get the actual value, we could materialize the vec with u128 including nulls, but + // that could cost a lot of memory. Instead we just compute the index for of + // the values + let mut idx_to_val_idx = vec![]; + idx_to_val_idx.resize(self.val_count as usize, 0); + + let mut val_idx = 0; + for idx in 0..self.val_count { + if !self.null_values.contains(idx as u32) { + idx_to_val_idx[idx as usize] = val_idx as u32; + val_idx += 1; + } + } + + let iter = doc_id_map.iter_old_doc_ids().map(|idx| { + if self.null_values.contains(idx as u32) { + compressor.null_value + } else { + self.vals[idx_to_val_idx[idx as usize] as usize] + } + }); + compressor.compress_into(iter, &mut field_write)?; } else { let iter = (0..self.val_count).map(&mut get_val); - compressor.compress_into(iter, field_write)?; + compressor.compress_into(iter, &mut field_write)?; } Ok(()) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 487ab88d6..d7809a31a 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -776,6 +776,7 @@ impl Drop for IndexWriter { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::net::IpAddr; use proptest::prelude::*; use proptest::prop_oneof; @@ -1384,6 +1385,7 @@ mod tests { force_end_merge: bool, ) -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); + let ip_field = schema_builder.add_ip_field("ip", FAST | INDEXED | STORED); let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED); let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED); let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED); @@ -1439,17 +1441,35 @@ mod tests { match op { IndexingOp::AddDoc { id } => { let facet = Facet::from(&("/cola/".to_string() + &id.to_string())); - index_writer.add_document(doc!(id_field=>id, - bytes_field => id.to_le_bytes().as_slice(), - multi_numbers=> id, - multi_numbers => id, - bool_field => (id % 2u64) != 0, - multi_bools => (id % 2u64) != 0, - multi_bools => (id % 2u64) == 0, - text_field => id.to_string(), - facet_field => facet, - large_text_field=> LOREM - ))?; + let ip_from_id = IpAddr::from((id as u128).to_be_bytes()); + + if id % 3 == 0 { + // every 3rd doc has no ip field + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } else { + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + ip_field => ip_from_id, + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } } IndexingOp::DeleteDoc { id } => { index_writer.delete_term(Term::from_field_u64(id_field, id)); @@ -1530,6 +1550,32 @@ mod tests { .collect::>() ); + // Check ip addr + let ips: HashSet> = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap(); + segment_reader + .doc_ids_alive() + .map(move |doc| ff_reader.get(doc)) + }) + .collect(); + + assert_eq!( + ips, + expected_ids_and_num_occurrences + .keys() + .map(|id| { + if id % 3 == 0 { + None + } else { + Some(IpAddr::from((*id as u128).to_be_bytes())) + } + }) + .collect::>() + ); + // multivalue fast field tests for segment_reader in searcher.segment_readers().iter() { let ff_reader = segment_reader.fast_fields().u64s(multi_numbers).unwrap(); @@ -1631,6 +1677,31 @@ mod tests { Ok(()) } + #[test] + fn test_minimal() { + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + true, + false + ) + .is_ok()); + + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + false, + false + ) + .is_ok()); + } + proptest! { #![proptest_config(ProptestConfig::with_cases(20))] #[test] diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 899e07e77..9c362e8e6 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -376,7 +376,7 @@ impl IndexMerger { let iter = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; fast_field_reader - .get((*doc_id) as u64) + .get(*doc_id) .unwrap_or(compressor.null_value) }); diff --git a/src/schema/ip_options.rs b/src/schema/ip_options.rs index a71a38f45..4793509f5 100644 --- a/src/schema/ip_options.rs +++ b/src/schema/ip_options.rs @@ -1,41 +1,21 @@ +use std::ops::BitOr; + use serde::{Deserialize, Serialize}; -/// Express whether a field is single-value or multi-valued. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] -pub enum Cardinality { - /// The document must have exactly one value associated to the document. - #[serde(rename = "single")] - SingleValue, - /// The document can have any number of values associated to the document. - /// This is more memory and CPU expensive than the SingleValue solution. - #[serde(rename = "multi")] - MultiValues, -} +use super::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag}; -/// Define how an u64, i64, of f64 field should be handled by tantivy. +/// Define how an ip field should be handled by tantivy. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct IpOptions { indexed: bool, - fast: Option, + fast: bool, stored: bool, } impl IpOptions { /// Returns true iff the value is a fast field. pub fn is_fast(&self) -> bool { - self.fast.is_some() - } - - /// Set the field as a single-valued fast field. - /// - /// Fast fields are designed for random access. - /// Access time are similar to a random lookup in an array. - /// If more than one value is associated to a fast field, only the last one is - /// kept. - #[must_use] - pub fn set_fast(mut self, cardinality: Cardinality) -> Self { - self.fast = Some(cardinality); - self + self.fast } /// Returns `true` if the json object should be stored. @@ -66,4 +46,76 @@ impl IpOptions { self.stored = true; self } + + /// Set the field as a single-valued fast field. + /// + /// Fast fields are designed for random access. + /// Access time are similar to a random lookup in an array. + /// If more than one value is associated to a fast field, only the last one is + /// kept. + #[must_use] + pub fn set_fast(mut self) -> Self { + self.fast = true; + self + } +} + +impl From<()> for IpOptions { + fn from(_: ()) -> IpOptions { + IpOptions::default() + } +} + +impl From for IpOptions { + fn from(_: FastFlag) -> Self { + IpOptions { + indexed: false, + stored: false, + fast: true, + } + } +} + +impl From for IpOptions { + fn from(_: StoredFlag) -> Self { + IpOptions { + indexed: false, + stored: true, + fast: false, + } + } +} + +impl From for IpOptions { + fn from(_: IndexedFlag) -> Self { + IpOptions { + indexed: true, + stored: false, + fast: false, + } + } +} + +impl> BitOr for IpOptions { + type Output = IpOptions; + + fn bitor(self, other: T) -> IpOptions { + let other = other.into(); + IpOptions { + indexed: self.indexed | other.indexed, + stored: self.stored | other.stored, + fast: self.fast | other.fast, + } + } +} + +impl From> for IpOptions +where + Head: Clone, + Tail: Clone, + Self: BitOr + From + From, +{ + fn from(head_tail: SchemaFlagList) -> Self { + Self::from(head_tail.head) | Self::from(head_tail.tail) + } } diff --git a/src/schema/value.rs b/src/schema/value.rs index 5b338793c..e7496acec 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -223,6 +223,12 @@ impl From for Value { } } +impl From for Value { + fn from(v: IpAddr) -> Value { + Value::Ip(v) + } +} + impl From for Value { fn from(v: u64) -> Value { Value::U64(v)