diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs new file mode 100644 index 000000000..368314789 --- /dev/null +++ b/src/common/counting_writer.rs @@ -0,0 +1,58 @@ +use std::io::Write; +use std::io; + + +pub struct CountingWriter { + underlying: W, + written_bytes: usize, +} + +impl CountingWriter { + pub fn wrap(underlying: W) -> CountingWriter { + CountingWriter { + underlying: underlying, + written_bytes: 0, + } + } + + pub fn written_bytes(&self,) -> usize { + self.written_bytes + } + + pub fn finish(mut self) -> io::Result<(W, usize)> { + self.flush()?; + Ok((self.underlying, self.written_bytes)) + } +} + +impl Write for CountingWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let written_size = self.underlying.write(buf)?; + self.written_bytes += written_size; + Ok(written_size) + } + + fn flush(&mut self) -> io::Result<()> { + self.underlying.flush() + } +} + + + +#[cfg(test)] +mod test { + + use super::CountingWriter; + use std::io::Write; + + #[test] + fn test_counting_writer() { + let buffer: Vec = vec!(); + let mut counting_writer = CountingWriter::wrap(buffer); + let bytes = (0u8..10u8).collect::>(); + counting_writer.write_all(&bytes).unwrap(); + let (w, len): (Vec, usize) = counting_writer.finish().unwrap(); + assert_eq!(len, 10); + assert_eq!(w.len(), 10); + } +} \ No newline at end of file diff --git a/src/common/mod.rs b/src/common/mod.rs index 9c9fa41ce..84b4cadc5 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,7 @@ mod serialize; mod timer; mod vint; +mod counting_writer; pub mod bitpacker; pub use self::serialize::BinarySerializable; @@ -8,6 +9,8 @@ pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; pub use self::vint::VInt; +pub use self::counting_writer::CountingWriter; + use std::io; /// Create a default io error given a string. @@ -55,7 +58,6 @@ pub fn u64_to_i64(val: u64) -> i64 { } - #[cfg(test)] mod test { diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 9e5559f62..478936c75 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -73,3 +73,10 @@ impl Clone for ReadOnlySource { self.slice(0, self.len()) } } + +impl From> for ReadOnlySource { + fn from(data: Vec) -> ReadOnlySource { + let shared_data = SharedVecSlice::from(data); + ReadOnlySource::Anonymous(shared_data) + } +} diff --git a/src/directory/shared_vec_slice.rs b/src/directory/shared_vec_slice.rs index b534e6029..ab5e04644 100644 --- a/src/directory/shared_vec_slice.rs +++ b/src/directory/shared_vec_slice.rs @@ -34,3 +34,10 @@ impl SharedVecSlice { } } } + + +impl From> for SharedVecSlice { + fn from(data: Vec) -> SharedVecSlice { + SharedVecSlice::new(Arc::new(data)) + } +} \ No newline at end of file diff --git a/src/termdict/fstdict/mod.rs b/src/termdict/fstdict/mod.rs index 7566eaa1b..a0b6589d5 100644 --- a/src/termdict/fstdict/mod.rs +++ b/src/termdict/fstdict/mod.rs @@ -16,10 +16,11 @@ Keys (`&[u8]`) in this datastructure are sorted. mod termdict; mod streamer; -mod merger; pub use self::termdict::TermDictionary; pub use self::termdict::TermDictionaryBuilder; pub use self::streamer::TermStreamer; pub use self::streamer::TermStreamerBuilder; -pub use self::merger::TermMerger; + + + diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/fstdict/termdict.rs index e713a24d7..183ba63b0 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/fstdict/termdict.rs @@ -161,41 +161,3 @@ impl TermDictionary TermStreamerBuilder::new(self, self.fst_index.range()) } } - -#[cfg(test)] -mod tests { - use super::*; - use directory::{RAMDirectory, Directory}; - use std::path::PathBuf; - use fst::Streamer; - - #[test] - fn test_term_dictionary() { - let mut directory = RAMDirectory::create(); - let path = PathBuf::from("TermDictionary"); - { - let write = directory.open_write(&path).unwrap(); - let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap(); - term_dictionary_builder - .insert("abc".as_bytes(), &34u32) - .unwrap(); - term_dictionary_builder - .insert("abcd".as_bytes(), &346u32) - .unwrap(); - term_dictionary_builder.finish().unwrap(); - } - let source = directory.open_read(&path).unwrap(); - let term_dict: TermDictionary = TermDictionary::from_source(source).unwrap(); - assert_eq!(term_dict.get("abc"), Some(34u32)); - assert_eq!(term_dict.get("abcd"), Some(346u32)); - let mut stream = term_dict.stream(); - assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32)); - assert_eq!(stream.key(), "abc".as_bytes()); - assert_eq!(stream.value(), 34u32); - assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32)); - assert_eq!(stream.key(), "abcd".as_bytes()); - assert_eq!(stream.value(), 346u32); - assert!(!stream.advance()); - } - -} diff --git a/src/termdict/fstdict/merger.rs b/src/termdict/merger.rs similarity index 72% rename from src/termdict/fstdict/merger.rs rename to src/termdict/merger.rs index dbbc5f6e2..617233b25 100644 --- a/src/termdict/fstdict/merger.rs +++ b/src/termdict/merger.rs @@ -155,54 +155,3 @@ impl<'a, V> Streamer<'a> for TermMerger<'a, V> } } - -#[cfg(test)] -mod tests { - - use schema::{Term, SchemaBuilder, Document, TEXT}; - use core::Index; - - #[test] - fn test_term_iterator() { - let mut schema_builder = SchemaBuilder::default(); - let text_field = schema_builder.add_text_field("text", TEXT); - let index = Index::create_in_ram(schema_builder.build()); - { - let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "a b d f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "a b c d f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - { - { - let mut doc = Document::default(); - doc.add_text(text_field, "e f"); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - } - } - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let mut term_it = searcher.terms(); - let mut term_string = String::new(); - while term_it.advance() { - let term = Term::from_bytes(term_it.key()); - term_string.push_str(term.text()); - } - assert_eq!(&*term_string, "abcdef"); - } - -} diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 8117031c7..bae868d1c 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -16,10 +16,205 @@ Keys (`&[u8]`) in this datastructure are sorted. #[cfg(not(feature="streamdict"))] mod fstdict; #[cfg(not(feature="streamdict"))] -pub use self::fstdict::*; - +pub use self::fstdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; #[cfg(feature="streamdict")] mod streamdict; #[cfg(feature="streamdict")] -pub use self::termdict::*; +pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder}; + +mod merger; +pub use self::merger::TermMerger; + + +#[cfg(test)] +mod tests { + use super::{TermDictionary, TermDictionaryBuilder}; + use directory::{RAMDirectory, Directory, ReadOnlySource}; + use std::path::PathBuf; + use fst::Streamer; + use schema::{Term, SchemaBuilder, Document, TEXT}; + use core::Index; + use std::str; + const BLOCK_SIZE: usize = 1_500; + + + + #[test] + fn test_term_dictionary() { + let mut directory = RAMDirectory::create(); + let path = PathBuf::from("TermDictionary"); + { + let write = directory.open_write(&path).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilder::new(write).unwrap(); + term_dictionary_builder + .insert("abc".as_bytes(), &34u32) + .unwrap(); + term_dictionary_builder + .insert("abcd".as_bytes(), &346u32) + .unwrap(); + term_dictionary_builder.finish().unwrap(); + } + let source = directory.open_read(&path).unwrap(); + let term_dict: TermDictionary = TermDictionary::from_source(source).unwrap(); + assert_eq!(term_dict.get("abc"), Some(34u32)); + assert_eq!(term_dict.get("abcd"), Some(346u32)); + let mut stream = term_dict.stream(); + assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32)); + assert_eq!(stream.key(), "abc".as_bytes()); + assert_eq!(stream.value(), 34u32); + assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32)); + assert_eq!(stream.key(), "abcd".as_bytes()); + assert_eq!(stream.value(), 346u32); + assert!(!stream.advance()); + } + + + + + #[test] + fn test_term_iterator() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b d f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "a b c d f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + { + { + let mut doc = Document::default(); + doc.add_text(text_field, "e f"); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + } + } + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let mut term_it = searcher.terms(); + let mut term_string = String::new(); + while term_it.advance() { + let term = Term::from_bytes(term_it.key()); + term_string.push_str(term.text()); + } + assert_eq!(&*term_string, "abcdef"); + } + + + #[test] + fn test_stream_dictionary() { + let ids: Vec<_> = (0u32..10_000u32) + .map(|i| (format!("doc{:0>6}", i), i)) + .collect(); + let buffer: Vec = { + let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + for &(ref id, ref i) in &ids { + stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); + } + stream_dictionary_builder.finish().unwrap() + }; + let source = ReadOnlySource::from(buffer); + let stream_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + { + let mut streamer = stream_dictionary.stream(); + let mut i = 0; + while let Some((streamer_k, streamer_v)) = streamer.next() { + let &(ref key, ref v) = &ids[i]; + assert_eq!(streamer_k, key.as_bytes()); + assert_eq!(streamer_v, *v); + i += 1; + } + } + + let &(ref key, ref _v) = &ids[2047]; + stream_dictionary.get(key.as_bytes()); + } + + #[test] + fn test_stream_range() { + let ids: Vec<_> = (0u32..10_000u32) + .map(|i| (format!("doc{:0>6}", i), i)) + .collect(); + let buffer: Vec = { + let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap(); + for &(ref id, ref i) in &ids { + stream_dictionary_builder.insert(id.as_bytes(), i).unwrap(); + } + stream_dictionary_builder.finish().unwrap() + }; + + println!("a"); + let source = ReadOnlySource::from(buffer); + + let stream_dictionary: TermDictionary = TermDictionary::from_source(source).unwrap(); + { + for i in (0..20).chain(2_000 - 10..2_000 + 10) { + println!("i {}", i); + let &(ref target_key, _) = &ids[i]; + let mut streamer = stream_dictionary + .range() + .ge(target_key.as_bytes()) + .into_stream(); + for j in 0..3 { + let (streamer_k, streamer_v) = streamer.next().unwrap(); + let &(ref key, ref v) = &ids[i + j]; + assert_eq!(str::from_utf8(streamer_k).unwrap(), key); + assert_eq!(streamer_v, *v); + } + } + } + + { + for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { + let &(ref target_key, _) = &ids[i]; + let mut streamer = stream_dictionary + .range() + .gt(target_key.as_bytes()) + .into_stream(); + for j in 0..3 { + let (streamer_k, streamer_v) = streamer.next().unwrap(); + let &(ref key, ref v) = &ids[i + j + 1]; + assert_eq!(streamer_k, key.as_bytes()); + assert_eq!(streamer_v, *v); + } + } + } + + { + for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { + println!("i2:{}", i); + for j in 0..3 { + println!("j2:{}", j); + let &(ref fst_key, _) = &ids[i]; + let &(ref last_key, _) = &ids[i + 3]; + let mut streamer = stream_dictionary + .range() + .ge(fst_key.as_bytes()) + .lt(last_key.as_bytes()) + .into_stream(); + for _ in 0..j { + assert!(streamer.next().is_some()); + } + assert!(streamer.next().is_some()); + } + } + } + + } + +} diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index e69de29bb..a58773605 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -0,0 +1,9 @@ + +mod termdict; +mod streamer; + +pub use self::termdict::TermDictionary; +pub use self::termdict::TermDictionaryBuilder; +pub use self::streamer::TermStreamer; +pub use self::streamer::TermStreamerBuilder; + diff --git a/src/termdict/streamdict/stream_dictionary.rs b/src/termdict/streamdict/streamer.rs similarity index 99% rename from src/termdict/streamdict/stream_dictionary.rs rename to src/termdict/streamdict/streamer.rs index 3f76cf217..5548fc139 100644 --- a/src/termdict/streamdict/stream_dictionary.rs +++ b/src/termdict/streamdict/streamer.rs @@ -38,8 +38,6 @@ fn common_prefix_length(left: &[u8], right: &[u8]) -> usize { .count() } - - fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { loop { if let Some(transition) = node.transitions().last() { diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs new file mode 100644 index 000000000..7c5099a3f --- /dev/null +++ b/src/termdict/streamdict/termdict.rs @@ -0,0 +1,354 @@ +#![allow(should_implement_trait)] + +use std::cmp::max; +use std::io; +use std::io::Write; +use std::io::Read; +use fst; +use fst::raw::Fst; +use common::VInt; +use directory::ReadOnlySource; +use common::BinarySerializable; +use std::marker::PhantomData; +use common::CountingWriter; +use std::cmp::Ordering; +use fst::{IntoStreamer, Streamer}; +use std::str; +use fst::raw::Node; +use fst::raw::CompiledAddr; + +const BLOCK_SIZE: usize = 1024; + +fn convert_fst_error(e: fst::Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, e) +} + +pub struct StreamDictionaryBuilder { + write: CountingWriter, + block_index: fst::MapBuilder>, + last_key: Vec, + len: usize, + _phantom_: PhantomData, +} + +fn common_prefix_length(left: &[u8], right: &[u8]) -> usize { + left.iter().cloned() + .zip(right.iter().cloned()) + .take_while(|&(b1, b2)| b1 == b2) + .count() +} + +fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec) { + loop { + if let Some(transition) = node.transitions().last() { + buffer.push(transition.inp); + node = fst.node(transition.addr); + } + else { + break; + } + } +} + + +fn strictly_previous_key>(fst_map: &fst::Map, key_as_ref: B) -> (Vec, u64) { + let key = key_as_ref.as_ref(); + let fst = fst_map.as_fst(); + let mut node = fst.root(); + let mut node_stack: Vec = vec!(node.clone()); + + // first check the longest prefix. + for &b in &key[..key.len() - 1] { + node = match node.find_input(b) { + None => { + break; + }, + Some(i) => { + fst.node(node.transition_addr(i)) + }, + }; + node_stack.push(node); + } + + let len_node_stack = node_stack.len(); + for i in (1..len_node_stack).rev() { + let cur_node = &node_stack[i]; + let b: u8 = key[i]; + let last_transition_opt = cur_node + .transitions() + .take_while(|transition| transition.inp < b) + .last(); + + if let Some(last_transition) = last_transition_opt { + let mut result_buffer = Vec::from(&key[..i]); + result_buffer.push(last_transition.inp); + let mut result = Vec::from(&key[..i]); + result.push(last_transition.inp); + let fork_node = fst.node(last_transition.addr); + fill_last(fst, fork_node, &mut result); + let val = fst_map.get(&result).unwrap(); + return (result, val); + } + else if cur_node.is_final() { + // the previous key is a prefix + let result_buffer = Vec::from(&key[..i]); + let val = fst_map.get(&result_buffer).unwrap(); + return (result_buffer, val); + } + } + + return (vec!(), 0); +} + + +impl StreamDictionaryBuilder { + + pub fn new(write: W) -> io::Result> { + let buffer: Vec = vec!(); + Ok(StreamDictionaryBuilder { + write: CountingWriter::wrap(write), + block_index: fst::MapBuilder::new(buffer) + .expect("This cannot fail"), + last_key: Vec::with_capacity(128), + len: 0, + _phantom_: PhantomData, + }) + } + + fn add_index_entry(&mut self) { + self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap(); + } + + pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{ + self.insert_key(key)?; + self.insert_value(value) + } + + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{ + if self.len % BLOCK_SIZE == 0 { + self.add_index_entry(); + } + self.len += 1; + let common_len = common_prefix_length(key, &self.last_key); + VInt(common_len as u64).serialize(&mut self.write)?; + self.last_key.truncate(common_len); + self.last_key.extend_from_slice(&key[common_len..]); + VInt((key.len() - common_len) as u64).serialize(&mut self.write)?; + self.write.write_all(&key[common_len..])?; + Ok(()) + } + + pub fn insert_value(&mut self, value: &V) -> io::Result<()>{ + value.serialize(&mut self.write)?; + Ok(()) + } + + pub fn finish(mut self) -> io::Result { + self.add_index_entry(); + let (mut w, split_len) = self.write.finish()?; + let fst_write = self.block_index + .into_inner() + .map_err(convert_fst_error)?; + w.write(&fst_write)?; + (split_len as u64).serialize(&mut w)?; + w.flush()?; + Ok(w) + } +} + + + +fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> { + let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref()); + let offset: usize = offset as usize; + StreamDictionaryStreamer { + cursor: &stream_dictionary.stream_data.as_slice()[offset..], + current_key: Vec::from(prev_key), + current_value: V::default(), + } +} + + +pub struct StreamDictionary where V:BinarySerializable + Default + Clone { + stream_data: ReadOnlySource, + fst_index: fst::Map, + _phantom_: PhantomData, +} + +fn open_fst_index(source: ReadOnlySource) -> io::Result { + Ok(fst::Map::from(match source { + ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)), + ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)), + })) +} + + +impl StreamDictionary { + + pub fn from_source(source: ReadOnlySource) -> io::Result> { + let total_len = source.len(); + let length_offset = total_len - 8; + let split_len: usize = { + let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; + u64::deserialize(&mut split_len_buffer)? as usize + }; + let stream_data = source.slice(0, split_len); + let fst_data = source.slice(split_len, length_offset); + let fst_index = open_fst_index(fst_data)?; + + Ok(StreamDictionary { + stream_data: stream_data, + fst_index: fst_index, + _phantom_: PhantomData + }) + } + + pub fn get>(&self, target_key: K) -> Option { + let mut streamer = stream_before(self, target_key.as_ref()); + while let Some((iter_key, iter_val)) = streamer.next() { + match iter_key.cmp(target_key.as_ref()) { + Ordering::Less => {} + Ordering::Equal => { + let val: V = (*iter_val).clone(); + return Some(val); + } + Ordering::Greater => { + return None; + } + } + } + return None; + } + + pub fn range(&self) -> StreamDictionaryStreamerBuilder { + let data: &[u8] = &self.stream_data; + StreamDictionaryStreamerBuilder { + stream_dictionary: &self, + offset_from: 0, + offset_to: (data.as_ptr() as usize) + data.len(), + current_key: vec!(), + } + } + + pub fn stream(&self) -> StreamDictionaryStreamer { + StreamDictionaryStreamer { + cursor: &*self.stream_data, + current_key: Vec::with_capacity(128), + current_value: V::default(), + } + } +} + +pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> { + stream_dictionary: &'a StreamDictionary, + offset_from: usize, + offset_to: usize, + current_key: Vec, +} + + +/// Returns offset information for the first +/// key in the stream matching a given predicate. +/// +/// returns (start offset, the data required to load the value) +fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer) -> (usize, Vec) + where V: 'a + BinarySerializable + Clone + Default { + let mut prev: &[u8] = streamer.cursor; + + let mut prev_data: Vec = streamer.current_key.clone(); + + while let Some((iter_key, _)) = streamer.next() { + if !predicate(iter_key) { + return (prev.as_ptr() as usize, prev_data); + } + prev = streamer.cursor; + prev_data.clear(); + prev_data.extend_from_slice(iter_key); + } + return (prev.as_ptr() as usize, prev_data); +} + +impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> { + pub fn ge>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + let target_key = bound.as_ref(); + let streamer = stream_before(&self.stream_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.lt(target_key) }; + let (offset_before, current_key) = get_offset(smaller_than, streamer); + self.current_key = current_key; + self.offset_from = offset_before; + self + } + + pub fn gt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + let target_key = bound.as_ref(); + let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.le(target_key) }; + let (offset_before, current_key) = get_offset(smaller_than, streamer); + self.current_key = current_key; + self.offset_from = offset_before; + self + } + + pub fn lt>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + let target_key = bound.as_ref(); + let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.le(target_key) }; + let (offset_before, _) = get_offset(smaller_than, streamer); + self.offset_to = offset_before; + self + } + + pub fn le>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> { + let target_key = bound.as_ref(); + let streamer = stream_before(self.stream_dictionary, target_key.as_ref()); + let smaller_than = |k: &[u8]| { k.lt(target_key) }; + let (offset_before, _) = get_offset(smaller_than, streamer); + self.offset_to = offset_before; + self + } + + pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> { + let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..]; + let origin = data.as_ptr() as usize; + let start = self.offset_from - origin; + let stop = max(self.offset_to - origin, start); + StreamDictionaryStreamer { + cursor: &data[start..stop], + current_key: self.current_key, + current_value: V::default(), + } + } +} + +pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> { + cursor: &'a [u8], + current_key: Vec, + current_value: V, +} + +impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> { + + pub fn next(&mut self) -> Option<(&[u8], &V)> { + if self.cursor.len() == 0 { + return None; + } + let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize; + let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize; + self.current_key.reserve(new_length); + unsafe { + self.current_key.set_len(new_length); + } + self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap(); + self.current_value = V::deserialize(&mut self.cursor).unwrap(); + Some((&self.current_key, &self.current_value)) + } + + + pub fn key(&self) -> &[u8] { + &self.current_key + } + + pub fn value(&self) -> &V { + &self.current_value + } +}