diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs index 368314789..e2bec7f2a 100644 --- a/src/common/counting_writer.rs +++ b/src/common/counting_writer.rs @@ -14,7 +14,7 @@ impl CountingWriter { written_bytes: 0, } } - + pub fn written_bytes(&self,) -> usize { self.written_bytes } diff --git a/src/termdict/fstdict/streamer.rs b/src/termdict/fstdict/streamer.rs index 13327172b..082a818b8 100644 --- a/src/termdict/fstdict/streamer.rs +++ b/src/termdict/fstdict/streamer.rs @@ -1,4 +1,4 @@ -use fst::{self, IntoStreamer, Streamer}; +use fst::{IntoStreamer, Streamer}; use fst::map::{StreamBuilder, Stream}; use common::BinarySerializable; use super::TermDictionary; @@ -6,14 +6,14 @@ use super::TermDictionary; /// `TermStreamerBuilder` is an helper object used to define /// a range of terms that should be streamed. pub struct TermStreamerBuilder<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fst_map: &'a TermDictionary, stream_builder: StreamBuilder<'a>, } impl<'a, V> TermStreamerBuilder<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { /// Limit the range to terms greater or equal to the bound pub fn ge>(mut self, bound: T) -> Self { @@ -45,12 +45,12 @@ impl<'a, V> TermStreamerBuilder<'a, V> TermStreamer { fst_map: self.fst_map, stream: self.stream_builder.into_stream(), - buffer: Vec::with_capacity(100), offset: 0u64, + current_key: Vec::with_capacity(100), + current_value: V::default(), } } - - /// Crates a new `TermStreamBuilder` + pub(crate) fn new(fst_map: &'a TermDictionary, stream_builder: StreamBuilder<'a>) -> TermStreamerBuilder<'a, V> { @@ -67,41 +67,31 @@ impl<'a, V> TermStreamerBuilder<'a, V> /// `TermStreamer` acts as a cursor over a range of terms of a segment. /// Terms are guaranteed to be sorted. pub struct TermStreamer<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fst_map: &'a TermDictionary, stream: Stream<'a>, offset: u64, - buffer: Vec, + current_key: Vec, + current_value: V, } -impl<'a, 'b, V> fst::Streamer<'b> for TermStreamer<'a, V> - where V: 'b + BinarySerializable -{ - type Item = (&'b [u8], V); - - fn next(&'b mut self) -> Option<(&'b [u8], V)> { - if self.advance() { - let v = self.value(); - Some((&self.buffer, v)) - } else { - None - } - } -} impl<'a, V> TermStreamer<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { /// Advance position the stream on the next item. /// Before the first call to `.advance()`, the stream /// is an unitialized state. pub fn advance(&mut self) -> bool { if let Some((term, offset)) = self.stream.next() { - self.buffer.clear(); - self.buffer.extend_from_slice(term); + self.current_key.clear(); + self.current_key.extend_from_slice(term); self.offset = offset; + self.current_value = self.fst_map + .read_value(self.offset) + .expect("Fst data is corrupted. Failed to deserialize a value."); true } else { false @@ -119,24 +109,19 @@ impl<'a, V> TermStreamer<'a, V> /// /// Before any call to `.next()`, `.key()` returns an empty array. pub fn key(&self) -> &[u8] { - &self.buffer + &self.current_key } /// Accesses the current value. - /// - /// Values are accessed in a lazy manner, their data is fetched - /// and deserialized only at the moment of the call to `.value()`. - /// + /// /// Calling `.value()` after the end of the stream will return the /// last `.value()` encounterred. - /// + /// /// # Panics /// - /// Calling `.value()` before the first call to `.advance()` or `.next()` - /// is undefined behavior. - pub fn value(&self) -> V { - self.fst_map - .read_value(self.offset) - .expect("Fst data is corrupted. Failed to deserialize a value.") + /// Calling `.value()` before the first call to `.advance()` returns + /// `V::default()`. + pub fn value(&self) -> &V { + &self.current_value } } diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/fstdict/termdict.rs index 183ba63b0..2978fdeb4 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/fstdict/termdict.rs @@ -1,11 +1,10 @@ use std::io::{self, Write}; use fst; use fst::raw::Fst; -use super::{TermStreamerBuilder, TermStreamer}; +use super::TermStreamerBuilder; use directory::ReadOnlySource; use common::BinarySerializable; use std::marker::PhantomData; -use schema::{Field, Term}; use postings::TermInfo; @@ -17,15 +16,16 @@ fn convert_fst_error(e: fst::Error) -> io::Error { /// Builder for the new term dictionary. /// /// Just like for the fst crate, all terms must be inserted in order. -pub struct TermDictionaryBuilder - where V: BinarySerializable +pub struct TermDictionaryBuilder + where W: Write, V: BinarySerializable + Default { fst_builder: fst::MapBuilder, data: Vec, _phantom_: PhantomData, } -impl TermDictionaryBuilder { +impl TermDictionaryBuilder + where W: Write, V: BinarySerializable + Default { /// Creates a new `TermDictionaryBuilder` pub fn new(w: W) -> io::Result> { let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; @@ -81,14 +81,6 @@ impl TermDictionaryBuilder { } } -/// Datastructure to access the `terms` of a segment. -pub struct TermDictionary - where V: BinarySerializable -{ - fst_index: fst::Map, - values_mmap: ReadOnlySource, - _phantom_: PhantomData, -} fn open_fst_index(source: ReadOnlySource) -> io::Result { let fst = match source { @@ -103,8 +95,17 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { Ok(fst::Map::from(fst)) } +/// Datastructure to access the `terms` of a segment. +pub struct TermDictionary + where V: BinarySerializable + Default +{ + fst_index: fst::Map, + values_mmap: ReadOnlySource, + _phantom_: PhantomData, +} + impl TermDictionary - where V: BinarySerializable + where V: BinarySerializable + Default { /// Opens a `TermDictionary` given a data source. pub fn from_source(source: ReadOnlySource) -> io::Result> { @@ -140,21 +141,6 @@ impl TermDictionary }) } - /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) - pub fn stream(&self) -> TermStreamer { - self.range().into_stream() - } - - /// A stream of all the sorted terms in the given field. - pub fn stream_field(&self, field: Field) -> TermStreamer { - let start_term = Term::from_field_text(field, ""); - let stop_term = Term::from_field_text(Field(field.0 + 1), ""); - self.range() - .ge(start_term.as_slice()) - .lt(stop_term.as_slice()) - .into_stream() - } - /// Returns a range builder, to stream all of the terms /// within an interval. pub fn range(&self) -> TermStreamerBuilder { diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 617233b25..f56fbbdfc 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,30 +1,30 @@ use std::collections::BinaryHeap; use core::SegmentReader; -use super::TermStreamer; +use termdict::TermStreamer; use common::BinarySerializable; use postings::TermInfo; use std::cmp::Ordering; use fst::Streamer; pub struct HeapItem<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { pub streamer: TermStreamer<'a, V>, pub segment_ord: usize, } impl<'a, V> PartialEq for HeapItem<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fn eq(&self, other: &Self) -> bool { self.segment_ord == other.segment_ord } } -impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable {} +impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable + Default {} impl<'a, V> PartialOrd for HeapItem<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fn partial_cmp(&self, other: &HeapItem<'a, V>) -> Option { Some(self.cmp(other)) @@ -32,7 +32,7 @@ impl<'a, V> PartialOrd for HeapItem<'a, V> } impl<'a, V> Ord for HeapItem<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fn cmp(&self, other: &HeapItem<'a, V>) -> Ordering { (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) @@ -47,14 +47,14 @@ impl<'a, V> Ord for HeapItem<'a, V> /// - a slice with the ordinal of the segments containing /// the terms. pub struct TermMerger<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { heap: BinaryHeap>, current_streamers: Vec>, } impl<'a, V> TermMerger<'a, V> - where V: 'a + BinarySerializable + where V: 'a + BinarySerializable + Default { fn new(streams: Vec>) -> TermMerger<'a, V> { TermMerger { @@ -131,7 +131,6 @@ impl<'a, V> TermMerger<'a, V> impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> - where TermInfo: BinarySerializable { fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> { TermMerger::new(segment_readers @@ -142,7 +141,7 @@ impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> } impl<'a, V> Streamer<'a> for TermMerger<'a, V> - where V: BinarySerializable + where V: BinarySerializable + Default { type Item = &'a [u8]; diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index bae868d1c..610aa303d 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -13,6 +13,10 @@ deserializing the value at this address. Keys (`&[u8]`) in this datastructure are sorted. */ +use schema::{Field, Term}; +use common::BinarySerializable; +use fst; + #[cfg(not(feature="streamdict"))] mod fstdict; #[cfg(not(feature="streamdict"))] @@ -21,15 +25,48 @@ pub use self::fstdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, Ter #[cfg(feature="streamdict")] mod streamdict; #[cfg(feature="streamdict")] -pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; +pub use self::streamdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; mod merger; pub use self::merger::TermMerger; +impl TermDictionary + where V: BinarySerializable + Default { + + /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) + pub fn stream(&self) -> TermStreamer { + self.range().into_stream() + } + + /// A stream of all the sorted terms in the given field. + pub fn stream_field(&self, field: Field) -> TermStreamer { + let start_term = Term::from_field_text(field, ""); + let stop_term = Term::from_field_text(Field(field.0 + 1), ""); + self.range() + .ge(start_term.as_slice()) + .lt(stop_term.as_slice()) + .into_stream() + } + +} + +impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V> + where V: 'a + BinarySerializable + Default +{ + type Item = (&'b [u8], &'b V); + + fn next(&'b mut self) -> Option<(&'b [u8], &V)> { + if self.advance() { + Some((self.key(), self.value())) + } else { + None + } + } +} #[cfg(test)] mod tests { - use super::{TermDictionary, TermDictionaryBuilder}; + use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use directory::{RAMDirectory, Directory, ReadOnlySource}; use std::path::PathBuf; use fst::Streamer; @@ -39,7 +76,6 @@ mod tests { const BLOCK_SIZE: usize = 1_500; - #[test] fn test_term_dictionary() { let mut directory = RAMDirectory::create(); @@ -60,12 +96,12 @@ mod tests { assert_eq!(term_dict.get("abc"), Some(34u32)); assert_eq!(term_dict.get("abcd"), Some(346u32)); let mut stream = term_dict.stream(); - assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32)); + assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), &34u32)); assert_eq!(stream.key(), "abc".as_bytes()); - assert_eq!(stream.value(), 34u32); - assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32)); + assert_eq!(*stream.value(), 34u32); + assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), &346u32)); assert_eq!(stream.key(), "abcd".as_bytes()); - assert_eq!(stream.value(), 346u32); + assert_eq!(*stream.value(), 346u32); assert!(!stream.advance()); } @@ -117,56 +153,54 @@ mod tests { #[test] - fn test_stream_dictionary() { + fn test_term_dictionary_stream() { let ids: Vec<_> = (0u32..10_000u32) .map(|i| (format!("doc{:0>6}", i), i)) .collect(); let buffer: Vec = { - let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); for &(ref id, ref i) in &ids { - stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); + term_dictionary_builder.insert(id.as_bytes(), i).unwrap(); } - stream_dictionary_builder.finish().unwrap() + term_dictionary_builder.finish().unwrap() }; let source = ReadOnlySource::from(buffer); - let stream_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); { - let mut streamer = stream_dictionary.stream(); + let mut streamer = term_dictionary.stream(); let mut i = 0; while let Some((streamer_k, streamer_v)) = streamer.next() { let &(ref key, ref v) = &ids[i]; assert_eq!(streamer_k, key.as_bytes()); - assert_eq!(streamer_v, *v); + assert_eq!(streamer_v, v); i += 1; } } let &(ref key, ref _v) = &ids[2047]; - stream_dictionary.get(key.as_bytes()); + term_dictionary.get(key.as_bytes()); } #[test] fn test_stream_range() { - let ids: Vec<_> = (0u32..10_000u32) + let ids: Vec<_> = (0u32..50_000u32) .map(|i| (format!("doc{:0>6}", i), i)) .collect(); let buffer: Vec = { - let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); for &(ref id, ref i) in &ids { - stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); + term_dictionary_builder.insert(id.as_bytes(), i).unwrap(); } - stream_dictionary_builder.finish().unwrap() + term_dictionary_builder.finish().unwrap() }; - println!("a"); let source = ReadOnlySource::from(buffer); - let stream_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); { - for i in (0..20).chain(2_000 - 10..2_000 + 10) { - println!("i {}", i); + for i in (0..20).chain(6000..8_000) { let &(ref target_key, _) = &ids[i]; - let mut streamer = stream_dictionary + let mut streamer = term_dictionary .range() .ge(target_key.as_bytes()) .into_stream(); @@ -174,7 +208,7 @@ mod tests { let (streamer_k, streamer_v) = streamer.next().unwrap(); let &(ref key, ref v) = &ids[i + j]; assert_eq!(str::from_utf8(streamer_k).unwrap(), key); - assert_eq!(streamer_v, *v); + assert_eq!(streamer_v, v); } } } @@ -182,7 +216,7 @@ mod tests { { for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { let &(ref target_key, _) = &ids[i]; - let mut streamer = stream_dictionary + let mut streamer = term_dictionary .range() .gt(target_key.as_bytes()) .into_stream(); @@ -190,31 +224,89 @@ mod tests { let (streamer_k, streamer_v) = streamer.next().unwrap(); let &(ref key, ref v) = &ids[i + j + 1]; assert_eq!(streamer_k, key.as_bytes()); - assert_eq!(streamer_v, *v); + assert_eq!(streamer_v, v); } } } { for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { - println!("i2:{}", i); for j in 0..3 { - println!("j2:{}", j); + println!("i {} j {}", i, j); let &(ref fst_key, _) = &ids[i]; - let &(ref last_key, _) = &ids[i + 3]; - let mut streamer = stream_dictionary + let &(ref last_key, _) = &ids[i + j]; + let mut streamer = term_dictionary .range() .ge(fst_key.as_bytes()) .lt(last_key.as_bytes()) .into_stream(); for _ in 0..j { + println!("ij"); assert!(streamer.next().is_some()); } - assert!(streamer.next().is_some()); + assert!(streamer.next().is_none()); } } } + } + + + #[test] + fn test_stream_range_boundaries() { + let buffer: Vec = { + let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + for i in 0u8..10u8 { + let number_arr = [i; 1]; + term_dictionary_builder.insert(&number_arr, &i).unwrap(); + } + term_dictionary_builder.finish().unwrap() + }; + let source = ReadOnlySource::from(buffer); + let term_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + let value_list = |mut streamer: TermStreamer| { + let mut res: Vec = vec!(); + while let Some((_, &v)) = streamer.next() { + res.push(v); + } + res + }; + { + let range = term_dictionary + .range() + .ge([2u8]) + .into_stream(); + assert_eq!(value_list(range), vec!(2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8)); + } + { + let range = term_dictionary + .range() + .gt([2u8]) + .into_stream(); + assert_eq!(value_list(range), vec!(3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8)); + } + { + let range = term_dictionary + .range() + .lt([6u8]) + .into_stream(); + assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8, 5u8)); + } + { + let range = term_dictionary + .range() + .le([6u8]) + .into_stream(); + assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8, 5u8, 6u8)); + } + { + let range = term_dictionary + .range() + .ge([0u8]) + .lt([5u8]) + .into_stream(); + assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8)); + } } } diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 5548fc139..01c2c38c9 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -1,258 +1,40 @@ #![allow(should_implement_trait)] use std::cmp::max; -use std::io; -use std::io::Write; use std::io::Read; -use fst; -use fst::raw::Fst; use common::VInt; -use directory::ReadOnlySource; use common::BinarySerializable; -use std::marker::PhantomData; -use common::CountingWriter; -use std::cmp::Ordering; -use fst::{IntoStreamer, Streamer}; -use std::str; -use fst::raw::Node; -use fst::raw::CompiledAddr; +use super::TermDictionary; +use fst::Streamer; -const BLOCK_SIZE: usize = 1024; - -fn convert_fst_error(e: fst::Error) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) -} - -pub struct StreamDictionaryBuilder { - write: CountingWriter, - block_index: fst::MapBuilder>, - last_key: Vec, - len: usize, - _phantom_: PhantomData, -} - -fn common_prefix_length(left: &[u8], right: &[u8]) -> usize { - left.iter().cloned() - .zip(right.iter().cloned()) - .take_while(|&(b1, b2)| b1 == b2) - .count() -} - -fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { - loop { - if let Some(transition) = node.transitions().last() { - buffer.push(transition.inp); - node = fst.node(transition.addr); - } - else { - break; - } - } -} - - -fn strictly_previous_key>(fst_map: &fst::Map, key_as_ref: B) -> (Vec, u64) { - let key = key_as_ref.as_ref(); - let fst = fst_map.as_fst(); - let mut node = fst.root(); - let mut node_stack: Vec = vec!(node.clone()); - - // first check the longest prefix. - for &b in &key[..key.len() - 1] { - node = match node.find_input(b) { - None => { - break; - }, - Some(i) => { - fst.node(node.transition_addr(i)) - }, - }; - node_stack.push(node); - } - - let len_node_stack = node_stack.len(); - for i in (1..len_node_stack).rev() { - let cur_node = &node_stack[i]; - let b: u8 = key[i]; - let last_transition_opt = cur_node - .transitions() - .take_while(|transition| transition.inp < b) - .last(); - - if let Some(last_transition) = last_transition_opt { - let mut result_buffer = Vec::from(&key[..i]); - result_buffer.push(last_transition.inp); - let mut result = Vec::from(&key[..i]); - result.push(last_transition.inp); - let fork_node = fst.node(last_transition.addr); - fill_last(fst, fork_node, &mut result); - let val = fst_map.get(&result).unwrap(); - return (result, val); - } - else if cur_node.is_final() { - // the previous key is a prefix - let result_buffer = Vec::from(&key[..i]); - let val = fst_map.get(&result_buffer).unwrap(); - return (result_buffer, val); - } - } - - return (vec!(), 0); -} - - -impl StreamDictionaryBuilder { - - pub fn new(write: W) -> io::Result> { - let buffer: Vec = vec!(); - Ok(StreamDictionaryBuilder { - write: CountingWriter::wrap(write), - block_index: fst::MapBuilder::new(buffer) - .expect("This cannot fail"), - last_key: Vec::with_capacity(128), - len: 0, - _phantom_: PhantomData, - }) - } - - fn add_index_entry(&mut self) { - self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap(); - } - - pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{ - self.insert_key(key)?; - self.insert_value(value) - } - - pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{ - if self.len % BLOCK_SIZE == 0 { - self.add_index_entry(); - } - self.len += 1; - let common_len = common_prefix_length(key, &self.last_key); - VInt(common_len as u64).serialize(&mut self.write)?; - self.last_key.truncate(common_len); - self.last_key.extend_from_slice(&key[common_len..]); - VInt((key.len() - common_len) as u64).serialize(&mut self.write)?; - self.write.write_all(&key[common_len..])?; - Ok(()) - } - - pub fn insert_value(&mut self, value: &V) -> io::Result<()>{ - value.serialize(&mut self.write)?; - Ok(()) - } - - pub fn finish(mut self) -> io::Result { - self.add_index_entry(); - let (mut w, split_len) = self.write.finish()?; - let fst_write = self.block_index - .into_inner() - .map_err(convert_fst_error)?; - w.write(&fst_write)?; - (split_len as u64).serialize(&mut w)?; - w.flush()?; - Ok(w) - } -} - - - -fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> { - let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref()); +pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionary, target_key: &[u8]) -> TermStreamer<'a, V> + where V: 'a + BinarySerializable + Default { + let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref()); let offset: usize = offset as usize; - StreamDictionaryStreamer { - cursor: &stream_dictionary.stream_data.as_slice()[offset..], + TermStreamer { + cursor: &term_dictionary.stream_data()[offset..], current_key: Vec::from(prev_key), current_value: V::default(), } } - -pub struct StreamDictionary where V:BinarySerializable + Default + Clone { - stream_data: ReadOnlySource, - fst_index: fst::Map, - _phantom_: PhantomData, -} - -fn open_fst_index(source: ReadOnlySource) -> io::Result { - Ok(fst::Map::from(match source { - ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)), - ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)), - })) -} - - -impl StreamDictionary { - - pub fn from_source(source: ReadOnlySource) -> io::Result> { - let total_len = source.len(); - let length_offset = total_len - 8; - let split_len: usize = { - let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - u64::deserialize(&mut split_len_buffer)? as usize - }; - let stream_data = source.slice(0, split_len); - let fst_data = source.slice(split_len, length_offset); - let fst_index = open_fst_index(fst_data)?; - - Ok(StreamDictionary { - stream_data: stream_data, - fst_index: fst_index, - _phantom_: PhantomData - }) - } - - pub fn get>(&self, target_key: K) -> Option { - let mut streamer = stream_before(self, target_key.as_ref()); - while let Some((iter_key, iter_val)) = streamer.next() { - match iter_key.cmp(target_key.as_ref()) { - Ordering::Less => {} - Ordering::Equal => { - let val: V = (*iter_val).clone(); - return Some(val); - } - Ordering::Greater => { - return None; - } - } - } - return None; - } - - pub fn range(&self) -> StreamDictionaryStreamerBuilder { - let data: &[u8] = &self.stream_data; - StreamDictionaryStreamerBuilder { - stream_dictionary: &self, - offset_from: 0, - offset_to: (data.as_ptr() as usize) + data.len(), - current_key: vec!(), - } - } - - pub fn stream(&self) -> StreamDictionaryStreamer { - StreamDictionaryStreamer { - cursor: &*self.stream_data, - current_key: Vec::with_capacity(128), - current_value: V::default(), - } - } -} - -pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> { - stream_dictionary: &'a StreamDictionary, +/// `TermStreamerBuilder` is an helper object used to define +/// a range of terms that should be streamed. +pub struct TermStreamerBuilder<'a, V> + where V: 'a + BinarySerializable + Default { + term_dictionary: &'a TermDictionary, + origin: usize, offset_from: usize, offset_to: usize, current_key: Vec, } - /// Returns offset information for the first /// key in the stream matching a given predicate. /// /// returns (start offset, the data required to load the value) -fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer) -> (usize, Vec) - where V: 'a + BinarySerializable + Clone + Default { +fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamer) -> (usize, Vec) + where V: 'a + BinarySerializable + Default { let mut prev: &[u8] = streamer.cursor; let mut prev_data: Vec = streamer.current_key.clone(); @@ -268,51 +50,69 @@ fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDicti return (prev.as_ptr() as usize, prev_data); } -impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> { - pub fn ge>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { +impl<'a, V> TermStreamerBuilder<'a, V> + where V: 'a + BinarySerializable + Default { + + /// Limit the range to terms greater or equal to the bound + pub fn ge>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { let target_key = bound.as_ref(); - let streamer = stream_before(&self.stream_dictionary, target_key.as_ref()); + let streamer = stream_before(&self.term_dictionary, target_key.as_ref()); let smaller_than = |k: &[u8]| { k.lt(target_key) }; let (offset_before, current_key) = get_offset(smaller_than, streamer); self.current_key = current_key; - self.offset_from = offset_before; + self.offset_from = offset_before - self.origin; self } - pub fn gt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + /// Limit the range to terms strictly greater than the bound + pub fn gt>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); let smaller_than = |k: &[u8]| { k.le(target_key) }; let (offset_before, current_key) = get_offset(smaller_than, streamer); self.current_key = current_key; - self.offset_from = offset_before; + self.offset_from = offset_before - self.origin; self } - pub fn lt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + /// Limit the range to terms lesser or equal to the bound + pub fn lt>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.le(target_key) }; - let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before; - self - } - - pub fn le>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); let smaller_than = |k: &[u8]| { k.lt(target_key) }; let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before; + self.offset_to = offset_before - self.origin; self } - pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> { - let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..]; + /// Limit the range to terms lesser or equal to the bound + pub fn le>(mut self, bound: T) -> TermStreamerBuilder<'a, V> { + let target_key = bound.as_ref(); + let streamer = stream_before(self.term_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.le(target_key) }; + let (offset_before, _) = get_offset(smaller_than, streamer); + self.offset_to = offset_before - self.origin; + self + } + + pub(crate) fn new(term_dictionary: &'a TermDictionary) -> TermStreamerBuilder<'a, V> { + let data = term_dictionary.stream_data(); let origin = data.as_ptr() as usize; - let start = self.offset_from - origin; - let stop = max(self.offset_to - origin, start); - StreamDictionaryStreamer { + TermStreamerBuilder { + term_dictionary: term_dictionary, + origin: origin, + offset_from: 0, + offset_to: data.len(), + current_key: vec!(), + } + } + + /// Build the streamer. + pub fn into_stream(self) -> TermStreamer<'a, V> { + let data: &[u8] = self.term_dictionary.stream_data(); + let start = self.offset_from; + let stop = max(self.offset_to, start); + TermStreamer { cursor: &data[start..stop], current_key: self.current_key, current_value: V::default(), @@ -320,17 +120,26 @@ impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerB } } -pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> { +/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// Terms are guaranteed to be sorted. +pub struct TermStreamer<'a, V> + where V: 'a + BinarySerializable + Default { cursor: &'a [u8], current_key: Vec, current_value: V, } -impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> { + + +impl<'a, V: BinarySerializable> TermStreamer<'a, V> + where V: 'a + BinarySerializable + Default { - pub fn next(&mut self) -> Option<(&[u8], &V)> { + /// Advance position the stream on the next item. + /// Before the first call to `.advance()`, the stream + /// is an unitialized state. + pub fn advance(&mut self) -> bool { if self.cursor.len() == 0 { - return None; + return false; } let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize; let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize; @@ -340,124 +149,37 @@ impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> { } self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap(); self.current_value = V::deserialize(&mut self.cursor).unwrap(); - Some((&self.current_key, &self.current_value)) + return true; } - + /// Accesses the current key. + /// + /// `.key()` should return the key that was returned + /// by the `.next()` method. + /// + /// If the end of the stream as been reached, and `.next()` + /// has been called and returned `None`, `.key()` remains + /// the value of the last key encounterred. + /// + /// Before any call to `.next()`, `.key()` returns an empty array. pub fn key(&self) -> &[u8] { &self.current_key } + /// Accesses the current value. + /// + /// Calling `.value()` after the end of the stream will return the + /// last `.value()` encounterred. + /// + /// # Panics + /// + /// Calling `.value()` before the first call to `.advance()` returns + /// `V::default()`. pub fn value(&self) -> &V { &self.current_value } -} -#[cfg(test)] -mod test { - - use std::str; - use directory::ReadOnlySource; - use super::CountingWriter; - use std::io::Write; - use super::{BLOCK_SIZE, StreamDictionary, StreamDictionaryBuilder}; - - #[test] - fn test_stream_dictionary() { - let ids: Vec<_> = (0u32..10_000u32) - .map(|i| (format!("doc{:0>6}", i), i)) - .collect(); - let buffer: Vec = { - let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap(); - for &(ref id, ref i) in &ids { - stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); - } - stream_dictionary_builder.finish().unwrap() - }; - let source = ReadOnlySource::from(buffer); - let stream_dictionary: StreamDictionary = StreamDictionary::from_source(source).unwrap(); - { - let mut streamer = stream_dictionary.stream(); - let mut i = 0; - while let Some((streamer_k, streamer_v)) = streamer.next() { - let &(ref key, ref v) = &ids[i]; - assert_eq!(streamer_k, key.as_bytes()); - assert_eq!(streamer_v, v); - i += 1; - } - } - - let &(ref key, ref _v) = &ids[2047]; - stream_dictionary.get(key.as_bytes()); + pub(crate) fn extract_value(self) -> V { + self.current_value } - - - - #[test] - fn test_stream_range() { - let ids: Vec<_> = (0u32..10_000u32) - .map(|i| (format!("doc{:0>6}", i), i)) - .collect(); - let buffer: Vec = { - let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap(); - for &(ref id, ref i) in &ids { - stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); - } - stream_dictionary_builder.finish().unwrap() - }; - let source = ReadOnlySource::from(buffer); - - let stream_dictionary: StreamDictionary = StreamDictionary::from_source(source).unwrap(); - { - for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { - let &(ref target_key, _) = &ids[i]; - let mut streamer = stream_dictionary - .range() - .ge(target_key.as_bytes()) - .into_stream(); - for j in 0..3 { - let (streamer_k, streamer_v) = streamer.next().unwrap(); - let &(ref key, ref v) = &ids[i + j]; - assert_eq!(str::from_utf8(streamer_k).unwrap(), key); - assert_eq!(streamer_v, v); - } - } - } - - { - for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { - let &(ref target_key, _) = &ids[i]; - let mut streamer = stream_dictionary - .range() - .gt(target_key.as_bytes()) - .into_stream(); - for j in 0..3 { - let (streamer_k, streamer_v) = streamer.next().unwrap(); - let &(ref key, ref v) = &ids[i + j + 1]; - assert_eq!(streamer_k, key.as_bytes()); - assert_eq!(streamer_v, v); - } - } - } - - { - for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { - for j in 0..3 { - let &(ref fst_key, _) = &ids[i]; - let &(ref last_key, _) = &ids[i + 3]; - let mut streamer = stream_dictionary - .range() - .ge(fst_key.as_bytes()) - .lt(last_key.as_bytes()) - .into_stream(); - for _ in 0..(j + 1) { - assert!(streamer.next().is_some()); - } - assert!(streamer.next().is_some()); - } - } - } - - } - } diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index 7c5099a3f..9f3614f73 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -1,9 +1,6 @@ #![allow(should_implement_trait)] -use std::cmp::max; -use std::io; -use std::io::Write; -use std::io::Read; +use std::io::{self, Write}; use fst; use fst::raw::Fst; use common::VInt; @@ -12,10 +9,10 @@ use common::BinarySerializable; use std::marker::PhantomData; use common::CountingWriter; use std::cmp::Ordering; -use fst::{IntoStreamer, Streamer}; -use std::str; +use postings::TermInfo; use fst::raw::Node; -use fst::raw::CompiledAddr; +use super::TermStreamerBuilder; +use super::streamer::stream_before; const BLOCK_SIZE: usize = 1024; @@ -23,7 +20,11 @@ fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } -pub struct StreamDictionaryBuilder { +/// Builder for the new term dictionary. +/// +/// All terms must be inserted in order. +pub struct TermDictionaryBuilder + where W: Write, V: BinarySerializable + Default { write: CountingWriter, block_index: fst::MapBuilder>, last_key: Vec, @@ -51,61 +52,12 @@ fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { } -fn strictly_previous_key>(fst_map: &fst::Map, key_as_ref: B) -> (Vec, u64) { - let key = key_as_ref.as_ref(); - let fst = fst_map.as_fst(); - let mut node = fst.root(); - let mut node_stack: Vec = vec!(node.clone()); +impl TermDictionaryBuilder { - // first check the longest prefix. - for &b in &key[..key.len() - 1] { - node = match node.find_input(b) { - None => { - break; - }, - Some(i) => { - fst.node(node.transition_addr(i)) - }, - }; - node_stack.push(node); - } - - let len_node_stack = node_stack.len(); - for i in (1..len_node_stack).rev() { - let cur_node = &node_stack[i]; - let b: u8 = key[i]; - let last_transition_opt = cur_node - .transitions() - .take_while(|transition| transition.inp < b) - .last(); - - if let Some(last_transition) = last_transition_opt { - let mut result_buffer = Vec::from(&key[..i]); - result_buffer.push(last_transition.inp); - let mut result = Vec::from(&key[..i]); - result.push(last_transition.inp); - let fork_node = fst.node(last_transition.addr); - fill_last(fst, fork_node, &mut result); - let val = fst_map.get(&result).unwrap(); - return (result, val); - } - else if cur_node.is_final() { - // the previous key is a prefix - let result_buffer = Vec::from(&key[..i]); - let val = fst_map.get(&result_buffer).unwrap(); - return (result_buffer, val); - } - } - - return (vec!(), 0); -} - - -impl StreamDictionaryBuilder { - - pub fn new(write: W) -> io::Result> { + /// Creates a new `TermDictionaryBuilder` + pub fn new(write: W) -> io::Result> { let buffer: Vec = vec!(); - Ok(StreamDictionaryBuilder { + Ok(TermDictionaryBuilder { write: CountingWriter::wrap(write), block_index: fst::MapBuilder::new(buffer) .expect("This cannot fail"), @@ -119,12 +71,22 @@ impl StreamDictionaryBuilder< self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap(); } + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{ self.insert_key(key)?; self.insert_value(value) } - pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{ + /// # Warning + /// Horribly dangerous internal API + /// + /// If used, it must be used by systematically alternating calls + /// to insert_key and insert_value. + /// + /// Prefer using `.insert(key, value)` + pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{ if self.len % BLOCK_SIZE == 0 { self.add_index_entry(); } @@ -138,11 +100,13 @@ impl StreamDictionaryBuilder< Ok(()) } - pub fn insert_value(&mut self, value: &V) -> io::Result<()>{ + pub(crate) fn insert_value(&mut self, value: &V) -> io::Result<()>{ value.serialize(&mut self.write)?; Ok(()) } + /// Finalize writing the builder, and returns the underlying + /// `Write` object. pub fn finish(mut self) -> io::Result { self.add_index_entry(); let (mut w, split_len) = self.write.finish()?; @@ -158,22 +122,6 @@ impl StreamDictionaryBuilder< -fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> { - let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref()); - let offset: usize = offset as usize; - StreamDictionaryStreamer { - cursor: &stream_dictionary.stream_data.as_slice()[offset..], - current_key: Vec::from(prev_key), - current_value: V::default(), - } -} - - -pub struct StreamDictionary where V:BinarySerializable + Default + Clone { - stream_data: ReadOnlySource, - fst_index: fst::Map, - _phantom_: PhantomData, -} fn open_fst_index(source: ReadOnlySource) -> io::Result { Ok(fst::Map::from(match source { @@ -182,10 +130,18 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result { })) } +/// Datastructure to access the `terms` of a segment. +pub struct TermDictionary where V: BinarySerializable + Default { + stream_data: ReadOnlySource, + fst_index: fst::Map, + _phantom_: PhantomData, +} -impl StreamDictionary { +impl TermDictionary + where V: BinarySerializable + Default { - pub fn from_source(source: ReadOnlySource) -> io::Result> { + /// Opens a `TermDictionary` given a data source. + pub fn from_source(source: ReadOnlySource) -> io::Result> { let total_len = source.len(); let length_offset = total_len - 8; let split_len: usize = { @@ -196,21 +152,75 @@ impl StreamDictionary { let fst_data = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_data)?; - Ok(StreamDictionary { + Ok(TermDictionary { stream_data: stream_data, fst_index: fst_index, _phantom_: PhantomData }) } + pub(crate) fn stream_data(&self) -> &[u8] { + self.stream_data.as_slice() + } + + pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec, u64) { + let fst_map = &self.fst_index; + let fst = fst_map.as_fst(); + let mut node = fst.root(); + let mut node_stack: Vec = vec!(node.clone()); + + // first check the longest prefix. + for &b in &key[..key.len() - 1] { + node = match node.find_input(b) { + None => { + break; + }, + Some(i) => { + fst.node(node.transition_addr(i)) + }, + }; + node_stack.push(node); + } + + let len_node_stack = node_stack.len(); + for i in (1..len_node_stack).rev() { + let cur_node = &node_stack[i]; + let b: u8 = key[i]; + let last_transition_opt = cur_node + .transitions() + .take_while(|transition| transition.inp < b) + .last(); + + if let Some(last_transition) = last_transition_opt { + let mut result_buffer = Vec::from(&key[..i]); + result_buffer.push(last_transition.inp); + let mut result = Vec::from(&key[..i]); + result.push(last_transition.inp); + let fork_node = fst.node(last_transition.addr); + fill_last(fst, fork_node, &mut result); + let val = fst_map.get(&result).unwrap(); + return (result, val); + } + else if cur_node.is_final() { + // the previous key is a prefix + let result_buffer = Vec::from(&key[..i]); + let val = fst_map.get(&result_buffer).unwrap(); + return (result_buffer, val); + } + } + + return (vec!(), 0); + } + + /// Lookups the value corresponding to the key. pub fn get>(&self, target_key: K) -> Option { let mut streamer = stream_before(self, target_key.as_ref()); - while let Some((iter_key, iter_val)) = streamer.next() { - match iter_key.cmp(target_key.as_ref()) { + while streamer.advance() { + let position = streamer.key().cmp(target_key.as_ref()); + match position { Ordering::Less => {} Ordering::Equal => { - let val: V = (*iter_val).clone(); - return Some(val); + return Some(streamer.extract_value()) } Ordering::Greater => { return None; @@ -219,136 +229,11 @@ impl StreamDictionary { } return None; } - - pub fn range(&self) -> StreamDictionaryStreamerBuilder { - let data: &[u8] = &self.stream_data; - StreamDictionaryStreamerBuilder { - stream_dictionary: &self, - offset_from: 0, - offset_to: (data.as_ptr() as usize) + data.len(), - current_key: vec!(), - } - } - pub fn stream(&self) -> StreamDictionaryStreamer { - StreamDictionaryStreamer { - cursor: &*self.stream_data, - current_key: Vec::with_capacity(128), - current_value: V::default(), - } - } -} - -pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> { - stream_dictionary: &'a StreamDictionary, - offset_from: usize, - offset_to: usize, - current_key: Vec, -} - - -/// Returns offset information for the first -/// key in the stream matching a given predicate. -/// -/// returns (start offset, the data required to load the value) -fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer) -> (usize, Vec) - where V: 'a + BinarySerializable + Clone + Default { - let mut prev: &[u8] = streamer.cursor; - - let mut prev_data: Vec = streamer.current_key.clone(); - - while let Some((iter_key, _)) = streamer.next() { - if !predicate(iter_key) { - return (prev.as_ptr() as usize, prev_data); - } - prev = streamer.cursor; - prev_data.clear(); - prev_data.extend_from_slice(iter_key); - } - return (prev.as_ptr() as usize, prev_data); -} - -impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> { - pub fn ge>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(&self.stream_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.lt(target_key) }; - let (offset_before, current_key) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.offset_from = offset_before; - self - } - - pub fn gt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.le(target_key) }; - let (offset_before, current_key) = get_offset(smaller_than, streamer); - self.current_key = current_key; - self.offset_from = offset_before; - self - } - - pub fn lt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.le(target_key) }; - let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before; - self - } - - pub fn le>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { - let target_key = bound.as_ref(); - let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); - let smaller_than = |k: &[u8]| { k.lt(target_key) }; - let (offset_before, _) = get_offset(smaller_than, streamer); - self.offset_to = offset_before; - self - } - - pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> { - let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..]; - let origin = data.as_ptr() as usize; - let start = self.offset_from - origin; - let stop = max(self.offset_to - origin, start); - StreamDictionaryStreamer { - cursor: &data[start..stop], - current_key: self.current_key, - current_value: V::default(), - } - } -} - -pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> { - cursor: &'a [u8], - current_key: Vec, - current_value: V, -} - -impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> { - - pub fn next(&mut self) -> Option<(&[u8], &V)> { - if self.cursor.len() == 0 { - return None; - } - let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize; - let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize; - self.current_key.reserve(new_length); - unsafe { - self.current_key.set_len(new_length); - } - self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap(); - self.current_value = V::deserialize(&mut self.cursor).unwrap(); - Some((&self.current_key, &self.current_value)) - } - - - pub fn key(&self) -> &[u8] { - &self.current_key - } - - pub fn value(&self) -> &V { - &self.current_value + + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range(&self) -> TermStreamerBuilder { + TermStreamerBuilder::new(self) } }