diff --git a/Cargo.toml b/Cargo.toml index e9ee6bd3e..6ad58696d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ serde_cbor = { version = "0.11.2", optional = true } async-trait = "0.1.53" arc-swap = "1.5.0" gcd = "2.1.0" +roaring = "0.9.0" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/fastfield_codecs/src/ip_codec.rs b/fastfield_codecs/src/ip_codec.rs index 6ecbd0897..93347a309 100644 --- a/fastfield_codecs/src/ip_codec.rs +++ b/fastfield_codecs/src/ip_codec.rs @@ -14,12 +14,15 @@ use std::{ cmp::Ordering, collections::BinaryHeap, - io, + io::{self, Write}, net::{IpAddr, Ipv6Addr}, + ops::RangeInclusive, }; use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; +use crate::FastFieldCodecReaderU128; + pub fn ip_to_u128(ip_addr: IpAddr) -> u128 { let ip_addr_v6: Ipv6Addr = match ip_addr { IpAddr::V4(v4) => v4.to_ipv6_mapped(), @@ -34,9 +37,10 @@ const INTERVALL_COST_IN_BITS: usize = 64; pub struct IntervalEncoding(); pub struct IntervalCompressor { + pub null_value: u128, min_value: u128, max_value: u128, - ranges_and_compact_start: CompactSpace, + compact_space: CompactSpace, pub num_bits: u8, } @@ -125,7 +129,7 @@ fn get_deltas(ip_addrs_sorted: &[u128]) -> BinaryHeap { /// Will find blanks if it will affect the number of bits used on the compact space. /// Returns the new amplitude and the positions of blanks -fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, CompactSpace) { +fn get_compact_space(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> CompactSpace { let mut deltas = get_deltas(ip_addrs_sorted); let mut amplitude_compact_space = *ip_addrs_sorted.last().unwrap() + 1; let mut amplitude_bits: u8 = (amplitude_compact_space as f64).log2().ceil() as u8; @@ -140,7 +144,8 @@ fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, Comp let pos = ip_addr_and_pos.pos; staged_blanks.push((delta, pos)); let staged_spaces_sum: u128 = staged_blanks.iter().map(|(delta, _)| delta - 1).sum(); - let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum; + // +1 for later added null value + let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1; let amplitude_new_bits = (amplitude_new_compact_space as f64).log2().ceil() as u8; if amplitude_bits == amplitude_new_bits { continue; @@ -169,7 +174,7 @@ fn get_blanks(ip_addrs_sorted: &[u128], cost_per_interval: usize) -> (u128, Comp } compact_space.add_hole(*ip_addrs_sorted.last().unwrap() + 1..=u128::MAX); - (amplitude_compact_space, compact_space.finish()) + compact_space.finish() } #[test] @@ -178,8 +183,11 @@ fn get_blanks_test() { let ips = vec![ 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, ]; - let (amplitude, ranges_and_compact_start) = get_blanks(&ips, 11); - assert_eq!(amplitude, 19); + let ranges_and_compact_start = get_compact_space(&ips, 11); + let null_value = ranges_and_compact_start.null_value; + let amplitude = ranges_and_compact_start.amplitude_compact_space(); + assert_eq!(null_value, 5); + assert_eq!(amplitude, 20); assert_eq!(2, ranges_and_compact_start.to_compact(2).unwrap()); assert_eq!(ranges_and_compact_start.to_compact(100).unwrap_err(), 0); @@ -197,6 +205,11 @@ impl CompactSpaceBuilder { } } + fn assign_and_return_null(&mut self) -> u128 { + self.covered_space[0] = *self.covered_space[0].start()..=*self.covered_space[0].end() + 1; + *self.covered_space[0].end() + } + // Assumes that repeated add_hole calls don't overlap. fn add_hole(&mut self, hole: std::ops::RangeInclusive) { let position = self @@ -222,7 +235,9 @@ impl CompactSpaceBuilder { self.covered_space.insert(position, new_range_start); } } - fn finish(self) -> CompactSpace { + fn finish(mut self) -> CompactSpace { + let null_value = self.assign_and_return_null(); + let mut compact_start: u64 = 0; let mut ranges_and_compact_start = vec![]; for cov in self.covered_space { @@ -232,6 +247,7 @@ impl CompactSpaceBuilder { } CompactSpace { ranges_and_compact_start, + null_value, } } } @@ -239,15 +255,19 @@ impl CompactSpaceBuilder { #[derive(Debug, Clone, Eq, PartialEq)] struct CompactSpace { ranges_and_compact_start: Vec<(std::ops::RangeInclusive, u64)>, + pub null_value: u128, } impl CompactSpace { - fn len(&self) -> usize { - self.ranges_and_compact_start.len() + fn amplitude_compact_space(&self) -> u128 { + let last_range = &self.ranges_and_compact_start[self.ranges_and_compact_start.len() - 1]; + last_range.1 as u128 + (last_range.0.end() - last_range.0.start()) + 1 } - fn get_range(&self, pos: usize) -> &(std::ops::RangeInclusive, u64) { + + fn get_range_and_compact_start(&self, pos: usize) -> &(std::ops::RangeInclusive, u64) { &self.ranges_and_compact_start[pos] } fn serialize(&self, output: &mut Vec) { + serialize_vint(self.null_value as u128, output); serialize_vint(self.ranges_and_compact_start.len() as u128, output); let mut prev_ip = 0; for (ip_range, _compact) in &self.ranges_and_compact_start { @@ -261,9 +281,9 @@ impl CompactSpace { } } - fn deserialize(mut data: &[u8]) -> io::Result<(&[u8], Self)> { - let (num_ip_addrs, new_data) = deserialize_vint(data)?; - data = new_data; + fn deserialize(data: &[u8]) -> io::Result<(&[u8], Self)> { + let (null_value, data) = deserialize_vint(data)?; + let (num_ip_addrs, mut data) = deserialize_vint(data)?; let mut ip_addr = 0u128; let mut compact = 0u64; let mut ranges_and_compact_start: Vec<(std::ops::RangeInclusive, u64)> = vec![]; @@ -286,6 +306,7 @@ impl CompactSpace { Ok(( data, Self { + null_value, ranges_and_compact_start, }, )) @@ -331,7 +352,8 @@ fn ranges_and_compact_start_test() { let ips = vec![ 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, ]; - let (_amplitude, ranges_and_compact_start) = get_blanks(&ips, 11); + let ranges_and_compact_start = get_compact_space(&ips, 11); + assert_eq!(ranges_and_compact_start.null_value, 5); let mut output = vec![]; ranges_and_compact_start.serialize(&mut output); @@ -348,59 +370,90 @@ fn ranges_and_compact_start_test() { } pub fn train(ip_addrs_sorted: &[u128]) -> IntervalCompressor { - let (amplitude, ranges_and_compact_start) = get_blanks(ip_addrs_sorted, INTERVALL_COST_IN_BITS); + let ranges_and_compact_start = get_compact_space(ip_addrs_sorted, INTERVALL_COST_IN_BITS); + let null_value = ranges_and_compact_start.null_value; + let amplitude_compact_space = ranges_and_compact_start.amplitude_compact_space(); - assert!(amplitude <= u64::MAX as u128, "case unsupported."); + assert!( + amplitude_compact_space <= u64::MAX as u128, + "case unsupported." + ); - let num_bits = tantivy_bitpacker::compute_num_bits(amplitude as u64); + let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); let min_value = ip_addrs_sorted[0]; let max_value = ip_addrs_sorted[ip_addrs_sorted.len() - 1]; let compressor = IntervalCompressor { + null_value, min_value, max_value, - ranges_and_compact_start, + compact_space: ranges_and_compact_start, num_bits, }; + + let max_value = *ip_addrs_sorted.last().unwrap().max(&null_value); assert_eq!( - compressor.to_compact(*ip_addrs_sorted.last().unwrap()) + 1, - amplitude as u64 + compressor.to_compact(max_value) + 1, + amplitude_compact_space as u64 ); compressor } impl IntervalCompressor { - fn to_compact(&self, ip_addr: u128) -> u64 { - self.ranges_and_compact_start.to_compact(ip_addr).unwrap() + pub fn from_vals(mut vals: Vec) -> Self { + vals.sort(); + train(&vals) } - fn write_header(&self, output: &mut Vec) { - assert!(output.is_empty()); + fn to_compact(&self, ip_addr: u128) -> u64 { + self.compact_space.to_compact(ip_addr).unwrap() + } + + fn write_footer(&self, write: &mut impl Write, num_vals: u128) -> io::Result<()> { + let mut footer = vec![]; // header flags to for future optional dictionary encoding let header_flags = 0u64; - output.extend_from_slice(&header_flags.to_le_bytes()); + footer.extend_from_slice(&header_flags.to_le_bytes()); - serialize_vint(self.min_value, output); - serialize_vint(self.max_value, output); + let null_value = self + .compact_space + .to_compact(self.null_value) + .expect("could not convert null to compact space"); + serialize_vint(null_value as u128, &mut footer); + serialize_vint(self.min_value, &mut footer); + serialize_vint(self.max_value, &mut footer); - self.ranges_and_compact_start.serialize(output); - output.push(self.num_bits); + self.compact_space.serialize(&mut footer); + + footer.push(self.num_bits); + serialize_vint(num_vals as u128, &mut footer); + + write.write_all(&footer)?; + let footer_len = footer.len() as u32; + write.write_all(&footer_len.to_le_bytes())?; + Ok(()) } - pub fn compress(&self, vals: &[u128]) -> Vec { + pub fn compress(&self, vals: &[u128]) -> io::Result> { let mut output = vec![]; - self.compress_into(vals, &mut output); - output + self.compress_into(vals.iter().cloned(), &mut output)?; + Ok(output) } - pub fn compress_into(&self, vals: &[u128], output: &mut Vec) { - self.write_header(output); - serialize_vint(vals.len() as u128, output); + pub fn compress_into( + &self, + vals: impl Iterator, + write: &mut impl Write, + ) -> io::Result<()> { let mut bitpacker = BitPacker::default(); - for &ip_addr in vals { + let mut num_vals = 0; + for ip_addr in vals { let compact = self.to_compact(ip_addr); - bitpacker.write(compact, self.num_bits, output).unwrap(); + bitpacker.write(compact, self.num_bits, write).unwrap(); + num_vals += 1; } - bitpacker.close(output).unwrap(); + bitpacker.close(write).unwrap(); + self.write_footer(write, num_vals as u128)?; + Ok(()) } } @@ -408,22 +461,56 @@ impl IntervalCompressor { pub struct IntervallDecompressor { compact_space: CompactSpace, bit_unpacker: BitUnpacker, + null_compact_space: u64, min_value: u128, max_value: u128, num_vals: usize, } +impl FastFieldCodecReaderU128 for IntervallDecompressor { + fn open_from_bytes(bytes: &[u8]) -> std::io::Result { + Self::open(bytes) + } + + fn get(&self, doc: u64, data: &[u8]) -> Option { + self.get(doc, data) + } + + fn get_range(&self, range: RangeInclusive, data: &[u8]) -> Vec { + self.get_range(range, data) + } + + fn min_value(&self) -> u128 { + self.min_value() + } + + fn max_value(&self) -> u128 { + self.max_value() + } + + /// The computed and assigned number for null values + fn null_value(&self) -> u128 { + self.compact_space.null_value + } +} + impl IntervallDecompressor { - pub fn open(data: &[u8]) -> io::Result<(IntervallDecompressor, &[u8])> { + pub fn open(data: &[u8]) -> io::Result { + let (data, footer_len_bytes) = data.split_at(data.len() - 4); + let footer_len = u32::from_le_bytes(footer_len_bytes.try_into().unwrap()); + + let data = &data[data.len() - footer_len as usize..]; let (_header_flags, data) = data.split_at(8); + let (null_compact_space, data) = deserialize_vint(data)?; let (min_value, data) = deserialize_vint(data)?; let (max_value, data) = deserialize_vint(data)?; let (mut data, compact_space) = CompactSpace::deserialize(data).unwrap(); let num_bits = data[0]; data = &data[1..]; - let (num_vals, data) = deserialize_vint(data)?; + let (num_vals, _data) = deserialize_vint(data)?; let decompressor = IntervallDecompressor { + null_compact_space: null_compact_space as u64, min_value, max_value, compact_space, @@ -431,7 +518,7 @@ impl IntervallDecompressor { bit_unpacker: BitUnpacker::new(num_bits), }; - Ok((decompressor, data)) + Ok(decompressor) } /// Converting to compact space for the decompressor is more complex, since we may get values which are @@ -452,7 +539,9 @@ impl IntervallDecompressor { /// Comparing on compact space: 1.2 GElements/s /// /// Comparing on original space: .06 GElements/s (not completely optimized) - pub fn get_range(&self, from_ip_addr: u128, to_ip_addr: u128, data: &[u8]) -> Vec { + pub fn get_range(&self, range: RangeInclusive, data: &[u8]) -> Vec { + let from_ip_addr = *range.start(); + let to_ip_addr = *range.end(); assert!(to_ip_addr >= from_ip_addr); let compact_from = self.to_compact(from_ip_addr); let compact_to = self.to_compact(to_ip_addr); @@ -464,14 +553,14 @@ impl IntervallDecompressor { } let compact_from = compact_from.unwrap_or_else(|pos| { - let range_and_compact_start = self.compact_space.get_range(pos); + let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos); let compact_end = range_and_compact_start.1 + (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64; compact_end + 1 }); // If there is no compact space, we go to the closest upperbound compact space let compact_to = compact_to.unwrap_or_else(|pos| { - let range_and_compact_start = self.compact_space.get_range(pos); + let range_and_compact_start = self.compact_space.get_range_and_compact_start(pos); let compact_end = range_and_compact_start.1 + (range_and_compact_start.0.end() - range_and_compact_start.0.start()) as u64; compact_end @@ -491,20 +580,25 @@ impl IntervallDecompressor { #[inline] pub fn iter_compact<'a>(&'a self, data: &'a [u8]) -> impl Iterator + 'a { - (0..self.num_vals).map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64) + (0..self.num_vals) + .map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64) + .filter(|val| *val != self.null_compact_space) } #[inline] pub fn iter<'a>(&'a self, data: &'a [u8]) -> impl Iterator + 'a { // TODO: Performance. It would be better to iterate on the ranges and check existence via the bit_unpacker. - (0..self.num_vals) - .map(move |idx| self.bit_unpacker.get(idx as u64, data) as u64) + self.iter_compact(data) .map(|compact| self.compact_to_ip_addr(compact)) } - pub fn get(&self, idx: usize, data: &[u8]) -> u128 { - let base = self.bit_unpacker.get(idx as u64, data); - self.compact_to_ip_addr(base) + pub fn get(&self, idx: u64, data: &[u8]) -> Option { + let compact = self.bit_unpacker.get(idx, data); + if compact == self.null_compact_space { + None + } else { + Some(self.compact_to_ip_addr(compact)) + } } pub fn min_value(&self) -> u128 { @@ -522,12 +616,13 @@ impl IntervalEncoding { train(&vals) } + // TODO move to test pub fn encode(&self, vals: &[u128]) -> Vec { if vals.is_empty() { return Vec::new(); } let compressor = self.train(vals.to_vec()); - compressor.compress(&vals) + compressor.compress(&vals).unwrap() } } @@ -537,11 +632,13 @@ mod tests { use super::*; fn decode_all(data: &[u8]) -> Vec { - let (decompressor, data) = IntervallDecompressor::open(data).unwrap(); + let decompressor = IntervallDecompressor::open(data).unwrap(); let mut u128_vals = Vec::new(); for idx in 0..decompressor.num_vals as usize { - let val = decompressor.get(idx, data); - u128_vals.push(val); + let val = decompressor.get(idx as u64, data); + if let Some(val) = val { + u128_vals.push(val); + } } u128_vals } @@ -554,7 +651,7 @@ mod tests { } #[test] - fn test_compress() { + fn test_range_1() { let vals = &[ 1u128, 100u128, @@ -568,28 +665,50 @@ mod tests { ]; let interval_encoding = IntervalEncoding::default(); let data = test_aux_vals(&interval_encoding, vals); - let (decomp, data) = IntervallDecompressor::open(&data).unwrap(); - let positions = decomp.get_range(0, 1, data); + let decomp = IntervallDecompressor::open(&data).unwrap(); + let positions = decomp.get_range(0..=1, &data); assert_eq!(positions, vec![0]); - let positions = decomp.get_range(0, 2, data); + let positions = decomp.get_range(0..=2, &data); assert_eq!(positions, vec![0]); - let positions = decomp.get_range(0, 3, data); + let positions = decomp.get_range(0..=3, &data); assert_eq!(positions, vec![0, 2]); - assert_eq!(decomp.get_range(99999u128, 99999u128, data), vec![3]); - assert_eq!(decomp.get_range(99998u128, 100000u128, data), vec![3, 4]); - assert_eq!(decomp.get_range(99998u128, 99999u128, data), vec![3]); - assert_eq!(decomp.get_range(99998u128, 99998u128, data), vec![]); - assert_eq!(decomp.get_range(333u128, 333u128, data), vec![8]); - assert_eq!(decomp.get_range(332u128, 333u128, data), vec![8]); - assert_eq!(decomp.get_range(332u128, 334u128, data), vec![8]); - assert_eq!(decomp.get_range(333u128, 334u128, data), vec![8]); + assert_eq!(decomp.get_range(99999u128..=99999u128, &data), vec![3]); + assert_eq!(decomp.get_range(99998u128..=100000u128, &data), vec![3, 4]); + assert_eq!(decomp.get_range(99998u128..=99999u128, &data), vec![3]); + assert_eq!(decomp.get_range(99998u128..=99998u128, &data), vec![]); + assert_eq!(decomp.get_range(333u128..=333u128, &data), vec![8]); + assert_eq!(decomp.get_range(332u128..=333u128, &data), vec![8]); + assert_eq!(decomp.get_range(332u128..=334u128, &data), vec![8]); + assert_eq!(decomp.get_range(333u128..=334u128, &data), vec![8]); assert_eq!( - decomp.get_range(4_000_211_221u128, 5_000_000_000u128, data), + decomp.get_range(4_000_211_221u128..=5_000_000_000u128, &data), vec![6, 7] ); } + #[test] + fn test_range_2() { + let vals = &[ + 100u128, + 99999u128, + 100000u128, + 100001u128, + 4_000_211_221u128, + 4_000_211_222u128, + 333u128, + ]; + let interval_encoding = IntervalEncoding::default(); + let data = test_aux_vals(&interval_encoding, vals); + let decomp = IntervallDecompressor::open(&data).unwrap(); + let positions = decomp.get_range(0..=5, &data); + assert_eq!(positions, vec![]); + let positions = decomp.get_range(0..=100, &data); + assert_eq!(positions, vec![0]); + let positions = decomp.get_range(0..=105, &data); + assert_eq!(positions, vec![0]); + } + #[test] fn test_first_large_gaps() { let vals = &[1_000_000_000u128; 100]; diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 857fa066d..5742e9285 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -4,6 +4,7 @@ extern crate more_asserts; use std::io; use std::io::Write; +use std::ops::RangeInclusive; pub mod bitpacked; pub mod ip_codec; @@ -20,10 +21,27 @@ pub trait FastFieldCodecReader: Sized { fn max_value(&self) -> u64; } +pub trait FastFieldCodecReaderU128: Sized { + /// reads the metadata and returns the CodecReader + fn open_from_bytes(bytes: &[u8]) -> std::io::Result; + + /// Get value for doc + fn get(&self, doc: u64, data: &[u8]) -> Option; + + /// Get docs for value range + fn get_range(&self, range: RangeInclusive, data: &[u8]) -> Vec; + + /// The computed and assigned number value for null values + fn null_value(&self) -> u128; + + fn min_value(&self) -> u128; + fn max_value(&self) -> u128; +} + /// The FastFieldSerializerEstimate trait is required on all variants /// of fast field compressions, to decide which one to choose. pub trait FastFieldCodecSerializer { - /// A codex needs to provide a unique name and id, which is + /// A codec needs to provide a unique name and id, which is /// used for debugging and de/serialization. const NAME: &'static str; const ID: u8; diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 24b253477..f00307a41 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -94,12 +94,12 @@ fn bench_ip() { let encoding = IntervalEncoding(); let dataset = ip_dataset(); print_set_stats(&dataset); - let enc = encoding.encode(&dataset); - let (decompressor, data) = IntervallDecompressor::open(&enc).unwrap(); + let data = encoding.encode(&dataset); + let decompressor = IntervallDecompressor::open(&data).unwrap(); for i in 11100..11150 { print_time!("get range"); - let doc_values = decompressor.get_range(dataset[i], dataset[i], data); + let doc_values = decompressor.get_range(dataset[i]..=dataset[i], &data); println!("{:?}", doc_values.len()); } } diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 2ab4f22e6..d41da3777 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -327,6 +327,14 @@ impl CompositeFastFieldSerializer { FastBytesFieldSerializer { write: field_write } } + /// Closes the serializer + /// + /// After this call the data must be persistently saved on disk. + pub fn get_field_writer(&mut self, field: Field, idx: usize) -> &mut impl Write { + let field_write = self.composite_write.for_field_with_idx(field, idx); + field_write + } + /// Closes the serializer /// /// After this call the data must be persistently saved on disk. diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 688427724..8cf20384a 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -2,7 +2,9 @@ use std::collections::HashMap; use std::io; use common; +use fastfield_codecs::ip_codec::{ip_to_u128, IntervalCompressor, IntervalEncoding}; use fnv::FnvHashMap; +use roaring::RoaringBitmap; use tantivy_bitpacker::BlockedBitpacker; use super::multivalued::MultiValuedFastFieldWriter; @@ -35,6 +37,7 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 { impl FastFieldsWriter { /// Create all `FastFieldWriter` required by the schema. pub fn from_schema(schema: &Schema) -> FastFieldsWriter { + let mut u128_value_writers = Vec::new(); let mut single_value_writers = Vec::new(); let mut term_id_writers = Vec::new(); let mut multi_values_writers = Vec::new(); @@ -103,6 +106,7 @@ impl FastFieldsWriter { } } FastFieldsWriter { + u128_value_writers, term_id_writers, single_value_writers, multi_values_writers, @@ -192,8 +196,7 @@ impl FastFieldsWriter { .iter_mut() .find(|field_writer| field_writer.field() == field) } -bytes_value_writers - /// Indexes all of the fastfields of a new document. + // Indexes all of the fastfields of a new document. pub fn add_document(&mut self, doc: &Document) { for field_writer in &mut self.term_id_writers { field_writer.add_document(doc); @@ -236,6 +239,101 @@ bytes_value_writers } } +/// Fast field writer for u128 values. +/// The fast field writer just keeps the values in memory. +/// +/// Only when the segment writer can be closed and +/// persisted on disc, the fast field writer is +/// sent to a `FastFieldSerializer` via the `.serialize(...)` +/// method. +/// +/// We cannot serialize earlier as the values are +/// bitpacked and the number of bits required for bitpacking +/// can only been known once we have seen all of the values. +/// +/// Both u64, i64 and f64 use the same writer. +pub struct U128FastFieldWriter { + field: Field, + vals: Vec, + val_count: u32, + + null_values: RoaringBitmap, + //val_if_missing: u64, + //val_min: u128, + //val_max: u128, +} + +impl U128FastFieldWriter { + /// Creates a new `IntFastFieldWriter` + pub fn new(field: Field) -> Self { + Self { + field, + vals: vec![], + val_count: 0, + null_values: RoaringBitmap::new(), + //val_min: u64::MAX, + //val_max: 0, + } + } + + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.len() * 16 + } + + /// Returns the field that this writer is targeting. + pub fn field(&self) -> Field { + self.field + } + + /// Records a new value. + /// + /// The n-th value being recorded is implicitely + /// associated to the document with the `DocId` n. + /// (Well, `n-1` actually because of 0-indexing) + pub fn add_val(&mut self, val: u128) { + self.vals.push(val); + } + + /// Extract the fast field value from the document + /// (or use the default value) and records it. + /// + /// Extract the value associated to the fast field for + /// this document. + /// + pub fn add_document(&mut self, doc: &Document) { + match doc.get_first(self.field) { + Some(v) => { + let ip_addr = v.as_ip().unwrap(); + let value = ip_to_u128(ip_addr); + self.add_val(value); + } + None => { + self.null_values.insert(self.val_count as u32); + } + }; + self.val_count += 1; + } + + /// Push the fast fields value to the `FastFieldWriter`. + pub fn serialize( + &self, + serializer: &mut CompositeFastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { + //let field_write = serializer.get_field_writer(self.field, 0); + //let compressor = IntervalCompressor::from_vals(self.vals.to_vec()); + //let vals = (0..self.val_count).map(|idx| + //if self.null_values.contains(idx as u32) { + //self.comp + + //} + //) + + unimplemented!() + } +} + /// Fast field writer for ints. /// The fast field writer just keeps the values in memory. ///