From 9370427ae2863a4e2bb7ade4d224626b6adf6a1e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 12 Feb 2018 10:24:58 +0900 Subject: [PATCH] Terminfo blocks (#244) * Using u64 key in the store * Using Option<> for the next element, as opposed to u64 * Code simplification. * Added TermInfoStoreWriter. * Added a TermInfoStore * Added FixedSized for BinarySerialized. --- src/collector/facet_collector.rs | 2 +- src/common/bitpacker.rs | 103 ++---- src/common/mod.rs | 62 +++- src/common/serialize.rs | 104 ++++-- src/common/vint.rs | 4 + .../pack/compression_pack_nosimd.rs | 10 +- src/datastruct/skip/mod.rs | 41 ++- src/datastruct/skip/skiplist.rs | 67 ++-- src/datastruct/skip/skiplist_builder.rs | 46 ++- src/fastfield/mod.rs | 4 +- src/fastfield/reader.rs | 11 +- src/fastfield/serializer.rs | 15 +- src/postings/term_info.rs | 25 +- src/store/reader.rs | 5 +- src/store/writer.rs | 10 +- src/termdict/fstdict/mod.rs | 2 + src/termdict/fstdict/term_info_store.rs | 318 ++++++++++++++++++ src/termdict/fstdict/termdict.rs | 42 +-- src/termdict/mod.rs | 5 +- 19 files changed, 646 insertions(+), 230 deletions(-) create mode 100644 src/termdict/fstdict/term_info_store.rs diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 13d50e161..b9efd2660 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -330,7 +330,7 @@ impl FacetCollector { fn finalize_segment(&mut self) { if self.ff_reader.is_some() { self.segment_counters.push(SegmentFacetCounter { - facet_reader: unsafe { self.ff_reader.take().unwrap().into_inner() }, + facet_reader: self.ff_reader.take().unwrap().into_inner(), facet_ords: mem::replace(&mut self.current_collapse_facet_ords, Vec::new()), facet_counts: mem::replace(&mut self.current_segment_counts, Vec::new()), }); diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index b78a32746..992e2d1db 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -4,64 +4,31 @@ use common::serialize::BinarySerializable; use std::mem; use std::ops::Deref; -/// Computes the number of bits that will be used for bitpacking. -/// -/// In general the target is the minimum number of bits -/// required to express the amplitude given in argument. -/// -/// e.g. If the amplitude is 10, we can store all ints on simply 4bits. -/// -/// The logic is slightly more convoluted here as for optimization -/// reasons, we want to ensure that a value spawns over at most 8 bytes -/// of aligns bytes. -/// -/// Spanning over 9 bytes is possible for instance, if we do -/// bitpacking with an amplitude of 63 bits. -/// In this case, the second int will start on bit -/// 63 (which belongs to byte 7) and ends at byte 15; -/// Hence 9 bytes (from byte 7 to byte 15 included). -/// -/// To avoid this, we force the number of bits to 64bits -/// when the result is greater than `64-8 = 56 bits`. -/// -/// Note that this only affects rare use cases spawning over -/// a very large range of values. Even in this case, it results -/// in an extra cost of at most 12% compared to the optimal -/// number of bits. -pub fn compute_num_bits(amplitude: u64) -> u8 { - let amplitude = (64u32 - amplitude.leading_zeros()) as u8; - if amplitude <= 64 - 8 { - amplitude - } else { - 64 - } -} -pub struct BitPacker { +pub(crate) struct BitPacker { mini_buffer: u64, - mini_buffer_written: usize, - num_bits: usize, + mini_buffer_written: usize } impl BitPacker { - pub fn new(num_bits: usize) -> BitPacker { + pub fn new() -> BitPacker { BitPacker { mini_buffer: 0u64, - mini_buffer_written: 0, - num_bits, + mini_buffer_written: 0 } } - pub fn write(&mut self, val: u64, output: &mut TWrite) -> io::Result<()> { + pub fn write(&mut self, val: u64, num_bits: u8, output: &mut TWrite) -> io::Result<()> { let val_u64 = val as u64; - if self.mini_buffer_written + self.num_bits > 64 { + let num_bits = num_bits as usize; + if self.mini_buffer_written + num_bits > 64 { self.mini_buffer |= val_u64.wrapping_shl(self.mini_buffer_written as u32); self.mini_buffer.serialize(output)?; self.mini_buffer = val_u64.wrapping_shr((64 - self.mini_buffer_written) as u32); - self.mini_buffer_written = self.mini_buffer_written + (self.num_bits as usize) - 64; + self.mini_buffer_written = self.mini_buffer_written + num_bits - 64; } else { self.mini_buffer |= val_u64 << self.mini_buffer_written; - self.mini_buffer_written += self.num_bits; + self.mini_buffer_written += num_bits; if self.mini_buffer_written == 64 { self.mini_buffer.serialize(output)?; self.mini_buffer_written = 0; @@ -71,7 +38,7 @@ impl BitPacker { Ok(()) } - pub(crate) fn flush(&mut self, output: &mut TWrite) -> io::Result<()> { + pub fn flush(&mut self, output: &mut TWrite) -> io::Result<()> { if self.mini_buffer_written > 0 { let num_bytes = (self.mini_buffer_written + 7) / 8; let arr: [u8; 8] = unsafe { mem::transmute::(self.mini_buffer) }; @@ -91,8 +58,8 @@ impl BitPacker { #[derive(Clone)] pub struct BitUnpacker -where - Data: Deref, + where + Data: Deref, { num_bits: usize, mask: u64, @@ -100,17 +67,18 @@ where } impl BitUnpacker -where - Data: Deref, + where + Data: Deref, { - pub fn new(data: Data, num_bits: usize) -> BitUnpacker { - let mask: u64 = if num_bits == 64 { - !0u64 - } else { - (1u64 << num_bits) - 1u64 - }; + pub fn new(data: Data, num_bits: u8) -> BitUnpacker { + let mask: u64 = + if num_bits == 64 { + !0u64 + } else { + (1u64 << num_bits) - 1u64 + }; BitUnpacker { - num_bits, + num_bits: num_bits as usize, mask, data, } @@ -148,7 +116,7 @@ where } unsafe { *(buffer[..].as_ptr() as *const u64) } }; - let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; + let val_shifted = val_unshifted_unmasked >> (bit_shift as u64); (val_shifted & mask) } } @@ -178,37 +146,26 @@ where #[cfg(test)] mod test { - use super::{compute_num_bits, BitPacker, BitUnpacker}; + use super::{BitPacker, BitUnpacker}; - #[test] - fn test_compute_num_bits() { - assert_eq!(compute_num_bits(1), 1u8); - assert_eq!(compute_num_bits(0), 0u8); - assert_eq!(compute_num_bits(2), 2u8); - assert_eq!(compute_num_bits(3), 2u8); - assert_eq!(compute_num_bits(4), 3u8); - assert_eq!(compute_num_bits(255), 8u8); - assert_eq!(compute_num_bits(256), 9u8); - assert_eq!(compute_num_bits(5_000_000_000), 33u8); - } - fn create_fastfield_bitpacker(len: usize, num_bits: usize) -> (BitUnpacker>, Vec) { + fn create_fastfield_bitpacker(len: usize, num_bits: u8) -> (BitUnpacker>, Vec) { let mut data = Vec::new(); - let mut bitpacker = BitPacker::new(num_bits); - let max_val: u64 = (1 << num_bits) - 1; + let mut bitpacker = BitPacker::new(); + let max_val: u64 = (1u64 << num_bits as u64) - 1u64; let vals: Vec = (0u64..len as u64) .map(|i| if max_val == 0 { 0 } else { i % max_val }) .collect(); for &val in &vals { - bitpacker.write(val, &mut data).unwrap(); + bitpacker.write(val, num_bits,&mut data).unwrap(); } bitpacker.close(&mut data).unwrap(); - assert_eq!(data.len(), (num_bits * len + 7) / 8 + 7); + assert_eq!(data.len(), ((num_bits as usize)* len + 7) / 8 + 7); let bitunpacker = BitUnpacker::new(data, num_bits); (bitunpacker, vals) } - fn test_bitpacker_util(len: usize, num_bits: usize) { + fn test_bitpacker_util(len: usize, num_bits: u8) { let (bitunpacker, vals) = create_fastfield_bitpacker(len, num_bits); for (i, val) in vals.iter().enumerate() { assert_eq!(bitunpacker.get(i), *val); diff --git a/src/common/mod.rs b/src/common/mod.rs index aceea844d..c103b468d 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -7,7 +7,7 @@ pub mod bitpacker; mod bitset; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; -pub use self::serialize::BinarySerializable; +pub use self::serialize::{BinarySerializable, FixedSize}; pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; @@ -15,11 +15,50 @@ pub use self::vint::VInt; pub use self::counting_writer::CountingWriter; pub use self::bitset::BitSet; pub(crate) use self::bitset::TinySet; +pub use byteorder::LittleEndian as Endianness; use std::io; +/// Computes the number of bits that will be used for bitpacking. +/// +/// In general the target is the minimum number of bits +/// required to express the amplitude given in argument. +/// +/// e.g. If the amplitude is 10, we can store all ints on simply 4bits. +/// +/// The logic is slightly more convoluted here as for optimization +/// reasons, we want to ensure that a value spawns over at most 8 bytes +/// of aligns bytes. +/// +/// Spanning over 9 bytes is possible for instance, if we do +/// bitpacking with an amplitude of 63 bits. +/// In this case, the second int will start on bit +/// 63 (which belongs to byte 7) and ends at byte 15; +/// Hence 9 bytes (from byte 7 to byte 15 included). +/// +/// To avoid this, we force the number of bits to 64bits +/// when the result is greater than `64-8 = 56 bits`. +/// +/// Note that this only affects rare use cases spawning over +/// a very large range of values. Even in this case, it results +/// in an extra cost of at most 12% compared to the optimal +/// number of bits. +pub(crate) fn compute_num_bits(n: u64) -> u8 { + let amplitude = (64u32 - n.leading_zeros()) as u8; + if amplitude <= 64 - 8 { + amplitude + } else { + 64 + } +} + + +pub(crate) fn is_power_of_2(n: usize) -> bool { + (n > 0) && (n & (n - 1) == 0) +} + /// Create a default io error given a string. -pub fn make_io_err(msg: String) -> io::Error { +pub(crate) fn make_io_err(msg: String) -> io::Error { io::Error::new(io::ErrorKind::Other, msg) } @@ -68,9 +107,10 @@ pub fn u64_to_i64(val: u64) -> i64 { } #[cfg(test)] -mod test { +pub(crate) mod test { - use super::{i64_to_u64, u64_to_i64}; + use super::{compute_num_bits, i64_to_u64, u64_to_i64}; + pub use super::serialize::test::fixed_size_test; fn test_i64_converter_helper(val: i64) { assert_eq!(u64_to_i64(i64_to_u64(val)), val); @@ -87,4 +127,18 @@ mod test { test_i64_converter_helper(i); } } + + + #[test] + fn test_compute_num_bits() { + assert_eq!(compute_num_bits(1), 1u8); + assert_eq!(compute_num_bits(0), 0u8); + assert_eq!(compute_num_bits(2), 2u8); + assert_eq!(compute_num_bits(3), 2u8); + assert_eq!(compute_num_bits(4), 3u8); + assert_eq!(compute_num_bits(255), 8u8); + assert_eq!(compute_num_bits(256), 9u8); + assert_eq!(compute_num_bits(5_000_000_000), 33u8); + } } + diff --git a/src/common/serialize.rs b/src/common/serialize.rs index f66c02b13..9012c0eb2 100644 --- a/src/common/serialize.rs +++ b/src/common/serialize.rs @@ -1,16 +1,26 @@ use byteorder::{ReadBytesExt, WriteBytesExt}; -use byteorder::LittleEndian as Endianness; +use common::Endianness; use std::fmt; use std::io::Write; use std::io::Read; use std::io; use common::VInt; +/// Trait for a simple binary serialization. pub trait BinarySerializable: fmt::Debug + Sized { + /// Serialize fn serialize(&self, writer: &mut W) -> io::Result<()>; + /// Deserialize fn deserialize(reader: &mut R) -> io::Result; } + +/// `FixedSize` marks a `BinarySerializable` as +/// always serializing to the same size. +pub trait FixedSize: BinarySerializable { + const SIZE_IN_BYTES: usize; +} + impl BinarySerializable for () { fn serialize(&self, _: &mut W) -> io::Result<()> { Ok(()) @@ -20,6 +30,10 @@ impl BinarySerializable for () { } } +impl FixedSize for () { + const SIZE_IN_BYTES: usize = 0; +} + impl BinarySerializable for Vec { fn serialize(&self, writer: &mut W) -> io::Result<()> { VInt(self.len() as u64).serialize(writer)?; @@ -59,6 +73,10 @@ impl BinarySerializable for u32 { } } +impl FixedSize for u32 { + const SIZE_IN_BYTES: usize = 4; +} + impl BinarySerializable for u64 { fn serialize(&self, writer: &mut W) -> io::Result<()> { writer.write_u64::(*self) @@ -68,6 +86,10 @@ impl BinarySerializable for u64 { } } +impl FixedSize for u64 { + const SIZE_IN_BYTES: usize = 8; +} + impl BinarySerializable for i64 { fn serialize(&self, writer: &mut W) -> io::Result<()> { writer.write_i64::(*self) @@ -77,6 +99,11 @@ impl BinarySerializable for i64 { } } +impl FixedSize for i64 { + const SIZE_IN_BYTES: usize = 8; +} + + impl BinarySerializable for u8 { fn serialize(&self, writer: &mut W) -> io::Result<()> { writer.write_u8(*self) @@ -86,6 +113,10 @@ impl BinarySerializable for u8 { } } +impl FixedSize for u8 { + const SIZE_IN_BYTES: usize = 1; +} + impl BinarySerializable for String { fn serialize(&self, writer: &mut W) -> io::Result<()> { let data: &[u8] = self.as_bytes(); @@ -103,64 +134,79 @@ impl BinarySerializable for String { } } + #[cfg(test)] -mod test { +pub mod test { use common::VInt; use super::*; - fn serialize_test(v: T, num_bytes: usize) { + + pub fn fixed_size_test() { + let mut buffer = Vec::new(); + O::default().serialize(&mut buffer).unwrap(); + assert_eq!(buffer.len(), O::SIZE_IN_BYTES); + } + + + fn serialize_test(v: T) -> usize { let mut buffer: Vec = Vec::new(); - if num_bytes != 0 { - v.serialize(&mut buffer).unwrap(); - assert_eq!(buffer.len(), num_bytes); - } else { - v.serialize(&mut buffer).unwrap(); - } + v.serialize(&mut buffer).unwrap(); + let num_bytes = buffer.len(); let mut cursor = &buffer[..]; let deser = T::deserialize(&mut cursor).unwrap(); assert_eq!(deser, v); + num_bytes } #[test] fn test_serialize_u8() { - serialize_test(3u8, 1); - serialize_test(5u8, 1); + fixed_size_test::(); } #[test] fn test_serialize_u32() { - serialize_test(3u32, 4); - serialize_test(5u32, 4); - serialize_test(u32::max_value(), 4); + fixed_size_test::(); + assert_eq!(4, serialize_test(3u32)); + assert_eq!(4, serialize_test(5u32)); + assert_eq!(4, serialize_test(u32::max_value())); + } + + #[test] + fn test_serialize_i64() { + fixed_size_test::(); + } + + #[test] + fn test_serialize_u64() { + fixed_size_test::(); } #[test] fn test_serialize_string() { - serialize_test(String::from(""), 1); - serialize_test(String::from("ぽよぽよ"), 1 + 3 * 4); - serialize_test(String::from("富士さん見える。"), 1 + 3 * 8); + assert_eq!(serialize_test(String::from("")), 1); + assert_eq!(serialize_test(String::from("ぽよぽよ")), 1 + 3 * 4); + assert_eq!(serialize_test(String::from("富士さん見える。")), 1 + 3 * 8); } #[test] fn test_serialize_vec() { - let v: Vec = Vec::new(); - serialize_test(v, 1); - serialize_test(vec![1u32, 3u32], 1 + 4 * 2); + assert_eq!(serialize_test(Vec::::new()), 1); + assert_eq!(serialize_test(vec![1u32, 3u32]), 1 + 4 * 2); } #[test] fn test_serialize_vint() { for i in 0..10_000 { - serialize_test(VInt(i as u64), 0); + serialize_test(VInt(i as u64)); } - serialize_test(VInt(7u64), 1); - serialize_test(VInt(127u64), 1); - serialize_test(VInt(128u64), 2); - serialize_test(VInt(129u64), 2); - serialize_test(VInt(1234u64), 2); - serialize_test(VInt(16_383), 2); - serialize_test(VInt(16_384), 3); - serialize_test(VInt(u64::max_value()), 10); + assert_eq!(serialize_test(VInt(7u64)), 1); + assert_eq!(serialize_test(VInt(127u64)), 1); + assert_eq!(serialize_test(VInt(128u64)), 2); + assert_eq!(serialize_test(VInt(129u64)), 2); + assert_eq!(serialize_test(VInt(1234u64)), 2); + assert_eq!(serialize_test(VInt(16_383u64)), 2); + assert_eq!(serialize_test(VInt(16_384u64)), 3); + assert_eq!(serialize_test(VInt(u64::max_value())), 10); } } diff --git a/src/common/vint.rs b/src/common/vint.rs index 70f673cfc..b0c32d1d3 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -11,6 +11,10 @@ impl VInt { pub fn val(&self) -> u64 { self.0 } + + pub fn deserialize_u64(reader: &mut R) -> io::Result { + VInt::deserialize(reader).map(|vint| vint.0) + } } impl BinarySerializable for VInt { diff --git a/src/compression/pack/compression_pack_nosimd.rs b/src/compression/pack/compression_pack_nosimd.rs index 23e010b4c..420cd5dbe 100644 --- a/src/compression/pack/compression_pack_nosimd.rs +++ b/src/compression/pack/compression_pack_nosimd.rs @@ -23,9 +23,9 @@ pub fn compress_sorted(vals: &mut [u32], output: &mut [u8], offset: u32) -> usiz let num_bits = compute_num_bits(max_delta as u64); counting_writer.write_all(&[num_bits]).unwrap(); - let mut bit_packer = BitPacker::new(num_bits as usize); + let mut bit_packer = BitPacker::new(); for val in vals { - bit_packer.write(*val as u64, &mut counting_writer).unwrap(); + bit_packer.write(*val as u64, num_bits,&mut counting_writer).unwrap(); } counting_writer.written_bytes() } @@ -61,13 +61,13 @@ impl BlockEncoder { let num_bits = compute_num_bits(max as u64); let mut counting_writer = CountingWriter::wrap(output); counting_writer.write_all(&[num_bits]).unwrap(); - let mut bit_packer = BitPacker::new(num_bits as usize); + let mut bit_packer = BitPacker::new(); for val in vals { - bit_packer.write(*val as u64, &mut counting_writer).unwrap(); + bit_packer.write(*val as u64, num_bits, &mut counting_writer).unwrap(); } for _ in vals.len()..COMPRESSION_BLOCK_SIZE { bit_packer - .write(vals[0] as u64, &mut counting_writer) + .write(vals[0] as u64, num_bits, &mut counting_writer) .unwrap(); } bit_packer.flush(&mut counting_writer).expect( diff --git a/src/datastruct/skip/mod.rs b/src/datastruct/skip/mod.rs index 18268fdd0..260393e72 100644 --- a/src/datastruct/skip/mod.rs +++ b/src/datastruct/skip/mod.rs @@ -9,12 +9,12 @@ pub use self::skiplist::SkipList; #[cfg(test)] mod tests { - use super::*; + use super::{SkipList, SkipListBuilder}; #[test] fn test_skiplist() { let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(10); + let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); skip_list_builder.insert(2, &3).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); let mut skip_list: SkipList = SkipList::from(output.as_slice()); @@ -24,7 +24,7 @@ mod tests { #[test] fn test_skiplist2() { let mut output: Vec = Vec::new(); - let skip_list_builder: SkipListBuilder = SkipListBuilder::new(10); + let skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); skip_list_builder.write::>(&mut output).unwrap(); let mut skip_list: SkipList = SkipList::from(output.as_slice()); assert_eq!(skip_list.next(), None); @@ -71,7 +71,7 @@ mod tests { #[test] fn test_skiplist5() { let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(3); + let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); skip_list_builder.insert(2, &()).unwrap(); skip_list_builder.insert(3, &()).unwrap(); skip_list_builder.insert(5, &()).unwrap(); @@ -103,7 +103,7 @@ mod tests { #[test] fn test_skiplist7() { let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(3); + let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); for i in 0..1000 { skip_list_builder.insert(i, &()).unwrap(); } @@ -121,35 +121,48 @@ mod tests { #[test] fn test_skiplist8() { let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(10); + let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); skip_list_builder.insert(2, &3).unwrap(); skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 13); + assert_eq!(output.len(), 11); assert_eq!(output[0], 1u8 + 128u8); } #[test] fn test_skiplist9() { let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(3); - for i in 0..9 { + let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(4); + for i in 0..4*4*4 { skip_list_builder.insert(i, &i).unwrap(); } skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 117); - assert_eq!(output[0], 3u8 + 128u8); + assert_eq!(output.len(), 774); + assert_eq!(output[0], 4u8 + 128u8); } #[test] fn test_skiplist10() { // checking that void gets serialized to nothing. let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(3); - for i in 0..9 { + let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); + for i in 0..((4*4*4) - 1) { skip_list_builder.insert(i, &()).unwrap(); } skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 81); + assert_eq!(output.len(), 230); + assert_eq!(output[0], 128u8 + 3u8); + } + + #[test] + fn test_skiplist11() { + // checking that void gets serialized to nothing. + let mut output: Vec = Vec::new(); + let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); + for i in 0..(4*4) { + skip_list_builder.insert(i, &()).unwrap(); + } + skip_list_builder.write::>(&mut output).unwrap(); + assert_eq!(output.len(), 65); assert_eq!(output[0], 128u8 + 3u8); } diff --git a/src/datastruct/skip/skiplist.rs b/src/datastruct/skip/skiplist.rs index 5323dcfcb..ef5491ac0 100644 --- a/src/datastruct/skip/skiplist.rs +++ b/src/datastruct/skip/skiplist.rs @@ -1,6 +1,5 @@ -use common::BinarySerializable; +use common::{BinarySerializable, VInt}; use std::marker::PhantomData; -use DocId; use std::cmp::max; static EMPTY: [u8; 0] = []; @@ -8,21 +7,20 @@ static EMPTY: [u8; 0] = []; struct Layer<'a, T> { data: &'a [u8], cursor: &'a [u8], - next_id: DocId, + next_id: Option, _phantom_: PhantomData, } impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> { - type Item = (DocId, T); + type Item = (u64, T); - fn next(&mut self) -> Option<(DocId, T)> { - if self.next_id == u32::max_value() { - None - } else { + fn next(&mut self) -> Option<(u64, T)> { + if let Some(cur_id) = self.next_id { let cur_val = T::deserialize(&mut self.cursor).unwrap(); - let cur_id = self.next_id; - self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value()); + self.next_id = VInt::deserialize_u64(&mut self.cursor).ok(); Some((cur_id, cur_val)) + } else { + None } } } @@ -30,7 +28,7 @@ impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> { impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> { fn from(data: &'a [u8]) -> Layer<'a, T> { let mut cursor = data; - let next_id = u32::deserialize(&mut cursor).unwrap_or(u32::max_value()); + let next_id = VInt::deserialize_u64(&mut cursor).ok(); Layer { data, cursor, @@ -45,14 +43,14 @@ impl<'a, T: BinarySerializable> Layer<'a, T> { Layer { data: &EMPTY, cursor: &EMPTY, - next_id: DocId::max_value(), + next_id: None, _phantom_: PhantomData, } } fn seek_offset(&mut self, offset: usize) { self.cursor = &self.data[offset..]; - self.next_id = u32::deserialize(&mut self.cursor).unwrap_or(u32::max_value()); + self.next_id = VInt::deserialize_u64(&mut self.cursor).ok(); } // Returns the last element (key, val) @@ -60,54 +58,61 @@ impl<'a, T: BinarySerializable> Layer<'a, T> { // // If there is no such element anymore, // returns None. - fn seek(&mut self, doc_id: DocId) -> Option<(DocId, T)> { - let mut val = None; - while self.next_id < doc_id { - match self.next() { - None => { - break; - } - v => { - val = v; + // + // If the element exists, it will be returned + // at the next call to `.next()`. + fn seek(&mut self, key: u64) -> Option<(u64, T)> { + let mut result: Option<(u64, T)> = None; + loop { + if let Some(next_id) = self.next_id { + if next_id < key { + if let Some(v) = self.next() { + result = Some(v); + continue; + } } } + return result; } - val } } pub struct SkipList<'a, T: BinarySerializable> { data_layer: Layer<'a, T>, - skip_layers: Vec>, + skip_layers: Vec>, } impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> { - type Item = (DocId, T); + type Item = (u64, T); - fn next(&mut self) -> Option<(DocId, T)> { + fn next(&mut self) -> Option<(u64, T)> { self.data_layer.next() } } impl<'a, T: BinarySerializable> SkipList<'a, T> { - pub fn seek(&mut self, doc_id: DocId) -> Option<(DocId, T)> { - let mut next_layer_skip: Option<(DocId, u32)> = None; + pub fn seek(&mut self, key: u64) -> Option<(u64, T)> { + let mut next_layer_skip: Option<(u64, u64)> = None; for skip_layer in &mut self.skip_layers { if let Some((_, offset)) = next_layer_skip { skip_layer.seek_offset(offset as usize); } - next_layer_skip = skip_layer.seek(doc_id); + next_layer_skip = skip_layer.seek(key); } if let Some((_, offset)) = next_layer_skip { self.data_layer.seek_offset(offset as usize); } - self.data_layer.seek(doc_id) + self.data_layer.seek(key) } } impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> { fn from(mut data: &'a [u8]) -> SkipList<'a, T> { - let offsets: Vec = Vec::deserialize(&mut data).unwrap(); + let offsets: Vec = Vec::::deserialize(&mut data) + .unwrap() + .into_iter() + .map(|el| el.0) + .collect(); let num_layers = offsets.len(); let layers_data: &[u8] = data; let data_layer: Layer<'a, T> = if num_layers == 0 { diff --git a/src/datastruct/skip/skiplist_builder.rs b/src/datastruct/skip/skiplist_builder.rs index 166c0bf0a..63aec23dd 100644 --- a/src/datastruct/skip/skiplist_builder.rs +++ b/src/datastruct/skip/skiplist_builder.rs @@ -1,13 +1,12 @@ use std::io::Write; -use common::BinarySerializable; +use common::{is_power_of_2, VInt, BinarySerializable}; use std::marker::PhantomData; -use DocId; use std::io; + struct LayerBuilder { - period: usize, + period_mask: usize, buffer: Vec, - remaining: usize, len: usize, _phantom_: PhantomData, } @@ -23,34 +22,33 @@ impl LayerBuilder { } fn with_period(period: usize) -> LayerBuilder { + assert!(is_power_of_2(period), "The period has to be a power of 2."); LayerBuilder { - period, + period_mask: (period - 1), buffer: Vec::new(), - remaining: period, len: 0, _phantom_: PhantomData, } } - fn insert(&mut self, doc_id: DocId, value: &T) -> io::Result> { - self.remaining -= 1; + fn insert(&mut self, key: u64, value: &T) -> io::Result> { self.len += 1; - let offset = self.written_size() as u32; - doc_id.serialize(&mut self.buffer)?; + let offset = self.written_size() as u64; + VInt(key).serialize(&mut self.buffer)?; value.serialize(&mut self.buffer)?; - Ok(if self.remaining == 0 { - self.remaining = self.period; - Some((doc_id, offset)) + let emit_skip_info = (self.period_mask & self.len) == 0; + if emit_skip_info { + Ok(Some((key, offset))) } else { - None - }) + Ok(None) + } } } pub struct SkipListBuilder { period: usize, data_layer: LayerBuilder, - skip_layers: Vec>, + skip_layers: Vec>, } impl SkipListBuilder { @@ -62,7 +60,7 @@ impl SkipListBuilder { } } - fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { + fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { if layer_id == self.skip_layers.len() { let layer_builder = LayerBuilder::with_period(self.period); self.skip_layers.push(layer_builder); @@ -70,9 +68,9 @@ impl SkipListBuilder { &mut self.skip_layers[layer_id] } - pub fn insert(&mut self, doc_id: DocId, dest: &T) -> io::Result<()> { + pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> { let mut layer_id = 0; - let mut skip_pointer = self.data_layer.insert(doc_id, dest)?; + let mut skip_pointer = self.data_layer.insert(key, dest)?; loop { skip_pointer = match skip_pointer { Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id) @@ -86,13 +84,11 @@ impl SkipListBuilder { } pub fn write(self, output: &mut W) -> io::Result<()> { - let mut size: u32 = 0; - let mut layer_sizes: Vec = Vec::new(); - size += self.data_layer.buffer.len() as u32; - layer_sizes.push(size); + let mut size: u64 = self.data_layer.buffer.len() as u64; + let mut layer_sizes = vec![VInt(size)]; for layer in self.skip_layers.iter().rev() { - size += layer.buffer.len() as u32; - layer_sizes.push(size); + size += layer.buffer.len() as u64; + layer_sizes.push(VInt(size)); } layer_sizes.serialize(output)?; self.data_layer.write(output)?; diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 6dcf2b480..ffca841b7 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -348,7 +348,7 @@ mod tests { b.iter(|| { let n = test::black_box(7000u32); let mut a = 0u64; - for i in Iterator::step_by((0u32..n), 7) { + for i in Iterator::step_by(0u32..n, 7) { a ^= permutation[i as usize]; } a @@ -394,7 +394,7 @@ mod tests { b.iter(|| { let n = test::black_box(7000u32); let mut a = 0u64; - for i in Iterator::step_by((0u32..n), 7) { + for i in Iterator::step_by(0u32..n, 7) { a ^= fast_field_reader.get(i); } a diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 1142c25d8..003a75a8e 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -1,6 +1,7 @@ use directory::ReadOnlySource; use common::{self, BinarySerializable}; -use common::bitpacker::{compute_num_bits, BitUnpacker}; +use common::compute_num_bits; +use common::bitpacker::BitUnpacker; use DocId; use schema::SchemaBuilder; use std::path::Path; @@ -117,11 +118,11 @@ impl FastFieldReader for U64FastFieldReader { let max_value = min_value + amplitude; let num_bits = compute_num_bits(amplitude); let owning_ref = OwningRef::new(data).map(|data| &data[16..]); - let bit_unpacker = BitUnpacker::new(owning_ref, num_bits as usize); + let bit_unpacker = BitUnpacker::new(owning_ref, num_bits); U64FastFieldReader { - min_value: min_value, - max_value: max_value, - bit_unpacker: bit_unpacker, + min_value, + max_value, + bit_unpacker, } } } diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index bde080e0e..8fab68e95 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -1,7 +1,8 @@ use common::BinarySerializable; use directory::WritePtr; use schema::Field; -use common::bitpacker::{compute_num_bits, BitPacker}; +use common::bitpacker::BitPacker; +use common::compute_num_bits; use common::CountingWriter; use common::CompositeWrite; use std::io::{self, Write}; @@ -74,6 +75,7 @@ pub struct FastSingleFieldSerializer<'a, W: Write + 'a> { bit_packer: BitPacker, write: &'a mut W, min_value: u64, + num_bits: u8, } impl<'a, W: Write> FastSingleFieldSerializer<'a, W> { @@ -86,18 +88,19 @@ impl<'a, W: Write> FastSingleFieldSerializer<'a, W> { let amplitude = max_value - min_value; amplitude.serialize(write)?; let num_bits = compute_num_bits(amplitude); - let bit_packer = BitPacker::new(num_bits as usize); + let bit_packer = BitPacker::new(); Ok(FastSingleFieldSerializer { - write: write, - bit_packer: bit_packer, - min_value: min_value, + write, + bit_packer, + min_value, + num_bits }) } /// Pushes a new value to the currently open u64 fast field. pub fn add_val(&mut self, val: u64) -> io::Result<()> { let val_to_write: u64 = val - self.min_value; - self.bit_packer.write(val_to_write, &mut self.write)?; + self.bit_packer.write(val_to_write, self.num_bits,&mut self.write)?; Ok(()) } diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index 654ba2ab2..a6af45e8a 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -1,4 +1,4 @@ -use common::BinarySerializable; +use common::{BinarySerializable, FixedSize}; use std::io; /// `TermInfo` contains all of the information @@ -23,10 +23,13 @@ pub struct TermInfo { pub positions_inner_offset: u8, } -impl TermInfo { - /// Size required to encode the `TermInfo`. - // TODO make this smaller when positions are unused for instance. - pub(crate) const SIZE_IN_BYTES: usize = 4 + 8 + 8 + 1; +impl FixedSize for TermInfo { + /// Size required for the binary serialization of `TermInfo`. + /// This is large, but in practise, all `TermInfo` but the first one + /// of the block are bitpacked. + /// + /// See `TermInfoStore`. + const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2*u64::SIZE_IN_BYTES + u8::SIZE_IN_BYTES; } impl BinarySerializable for TermInfo { @@ -50,3 +53,15 @@ impl BinarySerializable for TermInfo { }) } } + +#[cfg(test)] +mod tests { + + use super::TermInfo; + use common::test::fixed_size_test; + + #[test] + fn test_fixed_size() { + fixed_size_test::(); + } +} diff --git a/src/store/reader.rs b/src/store/reader.rs index 7f4343f8f..f1d139d6d 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -39,7 +39,10 @@ impl StoreReader { } fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { - self.block_index().seek(doc_id + 1).unwrap_or((0u32, 0u64)) + self.block_index() + .seek(doc_id as u64 + 1) + .map(|(doc, offset)| (doc as DocId, offset)) + .unwrap_or((0u32, 0u64)) } pub(crate) fn block_data(&self) -> &[u8] { diff --git a/src/store/writer.rs b/src/store/writer.rs index 34261c4cb..ad356e870 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -34,7 +34,7 @@ impl StoreWriter { pub fn new(writer: WritePtr) -> StoreWriter { StoreWriter { doc: 0, - offset_index_writer: SkipListBuilder::new(3), + offset_index_writer: SkipListBuilder::new(4), writer: CountingWriter::wrap(writer), intermediary_buffer: Vec::new(), current_block: Vec::new(), @@ -67,7 +67,7 @@ impl StoreWriter { if !self.current_block.is_empty() { self.write_and_compress_block()?; self.offset_index_writer - .insert(self.doc, &(self.writer.written_bytes() as u64))?; + .insert(self.doc as u64, &(self.writer.written_bytes() as u64))?; } let doc_offset = self.doc; let start_offset = self.writer.written_bytes() as u64; @@ -78,9 +78,9 @@ impl StoreWriter { // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. for (next_doc_id, block_addr) in store_reader.block_index() { - self.doc = doc_offset + next_doc_id; + self.doc = doc_offset + next_doc_id as u32; self.offset_index_writer - .insert(self.doc, &(start_offset + block_addr))?; + .insert(self.doc as u64, &(start_offset + block_addr))?; } Ok(()) } @@ -96,7 +96,7 @@ impl StoreWriter { (self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?; self.writer.write_all(&self.intermediary_buffer)?; self.offset_index_writer - .insert(self.doc, &(self.writer.written_bytes() as u64))?; + .insert(self.doc as u64, &(self.writer.written_bytes() as u64))?; self.current_block.clear(); Ok(()) } diff --git a/src/termdict/fstdict/mod.rs b/src/termdict/fstdict/mod.rs index a244ac2b4..0f31b6e15 100644 --- a/src/termdict/fstdict/mod.rs +++ b/src/termdict/fstdict/mod.rs @@ -16,8 +16,10 @@ Keys (`&[u8]`) in this datastructure are sorted. mod termdict; mod streamer; +mod term_info_store; pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; +pub use self::term_info_store::{TermInfoStore, TermInfoStoreWriter}; pub use self::streamer::TermStreamerImpl; pub use self::streamer::TermStreamerBuilderImpl; diff --git a/src/termdict/fstdict/term_info_store.rs b/src/termdict/fstdict/term_info_store.rs new file mode 100644 index 000000000..407b68b00 --- /dev/null +++ b/src/termdict/fstdict/term_info_store.rs @@ -0,0 +1,318 @@ +use std::io; +use std::cmp; +use std::io::{Read, Write}; +use postings::TermInfo; +use common::{BinarySerializable, FixedSize}; +use common::compute_num_bits; +use common::Endianness; +use common::bitpacker::BitPacker; +use directory::ReadOnlySource; +use termdict::TermOrdinal; +use byteorder::ByteOrder; + + +const BLOCK_LEN: usize = 256; + + +#[derive(Debug, Eq, PartialEq, Default)] +struct TermInfoBlockMeta { + offset: u64, + ref_term_info: TermInfo, + doc_freq_nbits: u8, + postings_offset_nbits: u8, + positions_offset_nbits: u8, +} + +impl BinarySerializable for TermInfoBlockMeta { + fn serialize(&self, write: &mut W) -> io::Result<()> { + self.offset.serialize(write)?; + self.ref_term_info.serialize(write)?; + write.write_all(&[self.doc_freq_nbits, + self.postings_offset_nbits, + self.positions_offset_nbits])?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let offset = u64::deserialize(reader)?; + let ref_term_info = TermInfo::deserialize(reader)?; + let mut buffer = [0u8; 3]; + reader.read_exact(&mut buffer)?; + Ok(TermInfoBlockMeta { + offset, + ref_term_info, + doc_freq_nbits: buffer[0], + postings_offset_nbits: buffer[1], + positions_offset_nbits: buffer[2] + }) + } +} + +impl FixedSize for TermInfoBlockMeta { + const SIZE_IN_BYTES: usize = u64::SIZE_IN_BYTES + TermInfo::SIZE_IN_BYTES + 3 * u8::SIZE_IN_BYTES; +} + +impl TermInfoBlockMeta { + + fn num_bits(&self) -> u8 { + self.doc_freq_nbits + self.postings_offset_nbits + self.positions_offset_nbits + 7 + } + + fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo { + let num_bits = self.num_bits() as usize; + let mut cursor = num_bits * inner_offset; + + let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32; + cursor += self.doc_freq_nbits as usize; + + let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits); + cursor += self.postings_offset_nbits as usize; + + let positions_offset = extract_bits(data, cursor, self.positions_offset_nbits); + cursor += self.positions_offset_nbits as usize; + + let positions_inner_offset = extract_bits(data, cursor, 7) as u8; + + TermInfo { + doc_freq, + postings_offset: postings_offset + self.ref_term_info.postings_offset, + positions_offset: positions_offset + self.ref_term_info.positions_offset, + positions_inner_offset, + } + } +} + + +pub struct TermInfoStore { + num_terms: usize, + block_meta_source: ReadOnlySource, + term_info_source: ReadOnlySource +} + +fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { + assert!(num_bits <= 56); + let addr_byte = addr_bits / 8; + let bit_shift = (addr_bits % 8) as u64; + let val_unshifted_unmasked: u64 = unsafe { *(data[addr_byte..].as_ptr() as *const u64) }; + let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift; + let mask = (1u64 << (num_bits as u64)) - 1; + val_shifted_unmasked & mask +} + +impl TermInfoStore { + pub fn open(data: ReadOnlySource) -> TermInfoStore { + let buffer = data.as_slice(); + let len = Endianness::read_u64(&buffer[0..8]) as usize; + let num_terms = Endianness::read_u64(&buffer[8..16]) as usize; + let block_meta_source = data.slice(16, 16 + len); + let term_info_source = data.slice_from(16 + len); + TermInfoStore { + num_terms, + block_meta_source, + term_info_source + } + } + + pub fn get(&self, term_ord: TermOrdinal) -> TermInfo { + let block_id = (term_ord as usize) / BLOCK_LEN; + let buffer = self.block_meta_source.as_slice(); + let mut block_data: &[u8] = &buffer[block_id * TermInfoBlockMeta::SIZE_IN_BYTES..]; + let term_info_block_data = TermInfoBlockMeta::deserialize(&mut block_data).expect("Failed to deserialize terminfoblockmeta"); + let inner_offset = (term_ord as usize) % BLOCK_LEN; + if inner_offset == 0 { + term_info_block_data.ref_term_info + } else { + let term_info_data = self.term_info_source.as_slice(); + term_info_block_data.deserialize_term_info(&term_info_data[term_info_block_data.offset as usize..], inner_offset - 1) + } + } + + pub fn num_terms(&self) -> usize { + self.num_terms + } +} + +pub struct TermInfoStoreWriter { + buffer_block_metas: Vec, + buffer_term_infos: Vec, + term_infos: Vec, + num_terms: u64, +} + +fn bitpack_serialize( + write: &mut W, + bit_packer: &mut BitPacker, + term_info_block_meta: &TermInfoBlockMeta, + term_info: &TermInfo) -> io::Result<()> { + bit_packer.write(term_info.doc_freq as u64, term_info_block_meta.doc_freq_nbits, write)?; + bit_packer.write(term_info.postings_offset, term_info_block_meta.postings_offset_nbits, write)?; + bit_packer.write(term_info.positions_offset, term_info_block_meta.positions_offset_nbits, write)?; + bit_packer.write(term_info.positions_inner_offset as u64, 7, write)?; + Ok(()) +} + +impl TermInfoStoreWriter { + pub fn new() -> TermInfoStoreWriter { + TermInfoStoreWriter { + buffer_block_metas: Vec::new(), + buffer_term_infos: Vec::new(), + term_infos: Vec::with_capacity(BLOCK_LEN), + num_terms: 0u64 + } + } + + fn flush_block(&mut self) -> io::Result<()> { + if self.term_infos.is_empty() { + return Ok(()); + } + let mut bit_packer = BitPacker::new(); + let ref_term_info = self.term_infos[0].clone(); + for term_info in &mut self.term_infos[1..] { + term_info.postings_offset -= ref_term_info.postings_offset; + term_info.positions_offset -= ref_term_info.positions_offset; + } + + let mut max_doc_freq: u32 = 0u32; + let mut max_postings_offset: u64 = 0u64; + let mut max_positions_offset: u64 = 0u64; + for term_info in &self.term_infos[1..] { + max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq); + max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset); + max_positions_offset = cmp::max(max_positions_offset, term_info.positions_offset); + } + + let max_doc_freq_nbits: u8 = compute_num_bits(max_doc_freq as u64); + let max_postings_offset_nbits = compute_num_bits(max_postings_offset); + let max_positions_offset_nbits = compute_num_bits(max_positions_offset); + + let term_info_block_meta = TermInfoBlockMeta { + offset: self.buffer_term_infos.len() as u64, + ref_term_info, + doc_freq_nbits: max_doc_freq_nbits, + postings_offset_nbits: max_postings_offset_nbits, + positions_offset_nbits: max_positions_offset_nbits, + }; + + term_info_block_meta.serialize(&mut self.buffer_block_metas)?; + for term_info in self.term_infos[1..].iter().cloned() { + bitpack_serialize( + &mut self.buffer_term_infos, + &mut bit_packer, + &term_info_block_meta, + &term_info + )?; + } + + // Block need end up at the end of a byte. + bit_packer.flush(&mut self.buffer_term_infos)?; + self.term_infos.clear(); + + Ok(()) + } + + pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> { + self.num_terms += 1u64; + self.term_infos.push(term_info.clone()); + if self.term_infos.len() >= BLOCK_LEN { + self.flush_block()?; + } + Ok(()) + } + + pub fn serialize(&mut self, write: &mut W) -> io::Result<()> { + if !self.term_infos.is_empty() { + self.flush_block()?; + } + let len = self.buffer_block_metas.len() as u64; + len.serialize(write)?; + self.num_terms.serialize(write)?; + write.write_all(&self.buffer_block_metas)?; + write.write_all(&self.buffer_term_infos)?; + write.write_all(&[0u8; 7])?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use super::extract_bits; + use common::bitpacker::BitPacker; + use common::BinarySerializable; + use super::TermInfoBlockMeta; + use super::{TermInfoStore, TermInfoStoreWriter}; + use directory::ReadOnlySource; + use postings::TermInfo; + use common::compute_num_bits; + use common; + + #[test] + fn test_term_info_block() { + common::test::fixed_size_test::(); + } + + #[test] + fn test_bitpacked() { + let mut buffer = Vec::new(); + let mut bitpack = BitPacker::new(); + bitpack.write(321u64, 9, &mut buffer).unwrap(); + assert_eq!(compute_num_bits(321u64), 9); + bitpack.write(2u64, 2, &mut buffer).unwrap(); + assert_eq!(compute_num_bits(2u64), 2); + bitpack.write(51, 6, &mut buffer).unwrap(); + assert_eq!(compute_num_bits(51), 6); + bitpack.close(&mut buffer).unwrap(); + assert_eq!(buffer.len(), 3 + 7); + assert_eq!(extract_bits(&buffer[..], 0, 9), 321u64); + assert_eq!(extract_bits(&buffer[..], 9, 2), 2u64); + assert_eq!(extract_bits(&buffer[..], 11, 6), 51u64); + } + + #[test] + fn test_term_info_block_meta_serialization() { + let term_info_block_meta = TermInfoBlockMeta { + offset: 2009, + ref_term_info: TermInfo { + doc_freq: 512, + postings_offset: 51, + positions_offset: 3584, + positions_inner_offset: 0 + }, + doc_freq_nbits: 10, + postings_offset_nbits: 5, + positions_offset_nbits: 11 + }; + let mut buffer: Vec = Vec::new(); + term_info_block_meta.serialize(&mut buffer).unwrap(); + let mut cursor: &[u8] = &buffer[..]; + let term_info_block_meta_serde = TermInfoBlockMeta::deserialize(&mut cursor).unwrap(); + assert_eq!(term_info_block_meta_serde, term_info_block_meta); + } + + #[test] + fn test_pack() { + let mut store_writer = TermInfoStoreWriter::new(); + let mut term_infos = vec!(); + for i in 0..1000 { + let term_info = TermInfo { + doc_freq: i as u32, + postings_offset: (i / 10) as u64, + positions_offset: (i * 7) as u64, + positions_inner_offset: (i % 128) as u8, + }; + store_writer.write_term_info(&term_info).unwrap(); + term_infos.push(term_info); + } + let mut buffer = Vec::new(); + store_writer + .serialize(&mut buffer) + .unwrap(); + let term_info_store = TermInfoStore::open(ReadOnlySource::from(buffer)); + for i in 0..1000 { + assert_eq!(term_info_store.get(i as u64), term_infos[i]); + } + } + +} + + diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/fstdict/termdict.rs index b36be64ae..f2d1dfaa6 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/fstdict/termdict.rs @@ -3,10 +3,11 @@ use fst; use fst::raw::Fst; use directory::ReadOnlySource; use common::BinarySerializable; +use common::CountingWriter; use schema::FieldType; use postings::TermInfo; use termdict::{TermDictionary, TermDictionaryBuilder, TermOrdinal}; -use super::{TermStreamerBuilderImpl, TermStreamerImpl}; +use super::{TermStreamerBuilderImpl, TermStreamerImpl, TermInfoStoreWriter, TermInfoStore}; fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) @@ -15,7 +16,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error { /// See [`TermDictionaryBuilder`](./trait.TermDictionaryBuilder.html) pub struct TermDictionaryBuilderImpl { fst_builder: fst::MapBuilder, - data: Vec, + term_info_store_writer: TermInfoStoreWriter, term_ord: u64, } @@ -41,8 +42,8 @@ where /// # Warning /// /// Horribly dangerous internal API. See `.insert_key(...)`. - pub(crate) fn insert_value(&mut self, value: &TermInfo) -> io::Result<()> { - value.serialize(&mut self.data)?; + pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { + self.term_info_store_writer.write_term_info(term_info)?; Ok(()) } } @@ -55,7 +56,7 @@ where let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; Ok(TermDictionaryBuilderImpl { fst_builder, - data: Vec::new(), + term_info_store_writer: TermInfoStoreWriter::new(), term_ord: 0, }) } @@ -67,12 +68,15 @@ where Ok(()) } - fn finish(self) -> io::Result { + fn finish(mut self) -> io::Result { let mut file = self.fst_builder.into_inner().map_err(convert_fst_error)?; - let footer_size = self.data.len() as u32; - file.write_all(&self.data)?; - (footer_size as u32).serialize(&mut file)?; - file.flush()?; + { + let mut counting_writer = CountingWriter::wrap(&mut file); + self.term_info_store_writer.serialize(&mut counting_writer)?; + let footer_size = counting_writer.written_bytes(); + (footer_size as u64).serialize(&mut counting_writer)?; + counting_writer.flush()?; + } Ok(file) } } @@ -92,7 +96,7 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map { /// See [`TermDictionary`](./trait.TermDictionary.html) pub struct TermDictionaryImpl { fst_index: fst::Map, - values_mmap: ReadOnlySource, + term_info_store: TermInfoStore, } impl<'a> TermDictionary<'a> for TermDictionaryImpl { @@ -102,22 +106,22 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { fn from_source(source: ReadOnlySource) -> Self { let total_len = source.len(); - let length_offset = total_len - 4; + let length_offset = total_len - 8; let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - let footer_size = u32::deserialize(&mut split_len_buffer) - .expect("Deserializing 4 bytes should always work") as usize; + let footer_size = u64::deserialize(&mut split_len_buffer) + .expect("Deserializing 8 bytes should always work") as usize; let split_len = length_offset - footer_size; let fst_source = source.slice(0, split_len); let values_source = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_source); TermDictionaryImpl { fst_index, - values_mmap: values_source, + term_info_store: TermInfoStore::open(values_source), } } fn num_terms(&self) -> usize { - self.values_mmap.len() / TermInfo::SIZE_IN_BYTES + self.term_info_store.num_terms() } fn term_ord>(&self, key: K) -> Option { @@ -145,11 +149,7 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { } fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo { - let buffer = self.values_mmap.as_slice(); - let offset = term_ord as usize * TermInfo::SIZE_IN_BYTES; - let mut cursor = &buffer[offset..]; - TermInfo::deserialize(&mut cursor) - .expect("The fst is corrupted. Failed to deserialize a value.") + self.term_info_store.get(term_ord) } fn get>(&self, key: K) -> Option { diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 0dbc6667d..64ff08732 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -5,7 +5,6 @@ that serves as an address in their respective posting list. The term dictionary API makes it possible to iterate through a range of keys in a sorted manner. -``` # Implementations @@ -471,7 +470,7 @@ mod tests { } { - for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { + for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) { let &(ref target_key, _) = &ids[i]; let mut streamer = term_dictionary .range() @@ -487,7 +486,7 @@ mod tests { } { - for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { + for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) { for j in 0..3 { let &(ref fst_key, _) = &ids[i]; let &(ref last_key, _) = &ids[i + j];