diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 3db74bb01..8ddec278b 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -43,6 +43,12 @@ impl ReadOnlySource { } } + pub fn split(self, addr: usize) -> (ReadOnlySource, ReadOnlySource) { + let left = self.slice(0, addr); + let right = self.slice_from(addr); + (left, right) + } + /// Creates a ReadOnlySource that is just a /// view over a slice of the data. /// diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 1ce1d6c54..5930eeaaf 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -340,7 +340,8 @@ mod tests { #[test] fn test_stream_range() { - let ids: Vec<_> = (0u32..50_000u32) +// let ids: Vec<_> = (0u32..10_000u32) + let ids: Vec<_> = (0u32..10_000u32) .map(|i| (format!("doc{:0>6}", i), i)) .collect(); let field_type = FieldType::Str(TEXT); diff --git a/src/termdict/streamdict/delta_encoder.rs b/src/termdict/streamdict/delta_encoder.rs index 34152961b..1666e0161 100644 --- a/src/termdict/streamdict/delta_encoder.rs +++ b/src/termdict/streamdict/delta_encoder.rs @@ -1,4 +1,5 @@ use postings::TermInfo; +use super::CheckPoint; use std::mem; /// Returns the len of the longest @@ -80,6 +81,10 @@ impl TermInfoDeltaEncoder { } } + pub fn term_info(&self) -> &TermInfo { + &self.term_info + } + pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo { let mut delta_term_info = DeltaTermInfo { doc_freq: term_info.doc_freq, @@ -102,14 +107,28 @@ pub struct TermInfoDeltaDecoder { has_positions: bool, } + impl TermInfoDeltaDecoder { - pub fn new(has_positions: bool) -> TermInfoDeltaDecoder { + + pub fn from_term_info(term_info: TermInfo, has_positions: bool) -> TermInfoDeltaDecoder { TermInfoDeltaDecoder { - term_info: TermInfo::default(), + term_info: term_info, has_positions: has_positions, } } + pub fn from_checkpoint(checkpoint: &CheckPoint, has_positions: bool) -> TermInfoDeltaDecoder { + TermInfoDeltaDecoder { + term_info: TermInfo { + doc_freq: 0u32, + postings_offset: checkpoint.postings_offset, + positions_offset: checkpoint.positions_offset, + positions_inner_offset: 0u8, + }, + has_positions: has_positions + } + } + pub fn decode(&mut self, code: u8, cursor: &mut &[u8]) { let num_bytes_docfreq: usize = ((code >> 1) & 3) as usize; let num_bytes_postings_offset: usize = ((code >> 3) & 3) as usize; diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index faf9c13fd..f9c01529e 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -1,7 +1,11 @@ +use std::io::{self, Write, Read}; +use common::BinarySerializable; + mod termdict; mod streamer; mod delta_encoder; + pub use self::delta_encoder::{TermDeltaEncoder, TermDeltaDecoder}; pub use self::delta_encoder::{TermInfoDeltaEncoder, TermInfoDeltaDecoder, DeltaTermInfo}; @@ -10,3 +14,30 @@ pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; pub use self::streamer::TermStreamerBuilderImpl; +#[derive(Debug)] +pub struct CheckPoint { + pub stream_offset: u32, + pub postings_offset: u32, + pub positions_offset: u32, +} + +impl BinarySerializable for CheckPoint { + + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.stream_offset.serialize(writer)?; + self.postings_offset.serialize(writer)?; + self.positions_offset.serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let stream_offset = u32::deserialize(reader)?; + let postings_offset = u32::deserialize(reader)?; + let positions_offset = u32::deserialize(reader)?; + Ok(CheckPoint { + stream_offset: stream_offset, + postings_offset: postings_offset, + positions_offset: positions_offset, + }) + } +} \ No newline at end of file diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index 4779a65c5..eacffe5dc 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -7,17 +7,19 @@ use postings::TermInfo; use common::BinarySerializable; use super::delta_encoder::{TermInfoDeltaDecoder, TermDeltaDecoder}; + fn stream_before<'a>(term_dictionary: &'a TermDictionaryImpl, target_key: &[u8], has_positions: bool) -> TermStreamerImpl<'a> { - let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref()); - let offset: usize = offset as usize; + + let (prev_key, checkpoint) = term_dictionary.strictly_previous_key(target_key.as_ref()); + let stream_data: &'a [u8] = &term_dictionary.stream_data()[checkpoint.stream_offset as usize..]; TermStreamerImpl { - cursor: &term_dictionary.stream_data()[offset..], + cursor: stream_data, term_delta_decoder: TermDeltaDecoder::with_previous_term(prev_key), - term_info_decoder: TermInfoDeltaDecoder::new(has_positions), // TODO checkpoint + term_info_decoder: TermInfoDeltaDecoder::from_checkpoint(&checkpoint, has_positions), } } @@ -30,6 +32,7 @@ pub struct TermStreamerBuilderImpl<'a> offset_from: usize, offset_to: usize, current_key: Vec, + term_info: TermInfo, has_positions: bool, } @@ -42,8 +45,9 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions); let smaller_than = |k: &[u8]| k.lt(target_key); - let (offset_before, current_key) = get_offset(smaller_than, streamer); + let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer); self.current_key = current_key; + self.term_info = term_info; self.offset_from = offset_before - self.origin; self } @@ -53,8 +57,9 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions); let smaller_than = |k: &[u8]| k.le(target_key); - let (offset_before, current_key) = get_offset(smaller_than, streamer); + let (offset_before, current_key, term_info) = get_offset(smaller_than, streamer); self.current_key = current_key; + self.term_info = term_info; self.offset_from = offset_before - self.origin; self } @@ -64,7 +69,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions); let smaller_than = |k: &[u8]| k.lt(target_key); - let (offset_before, _) = get_offset(smaller_than, streamer); + let (offset_before, _, _) = get_offset(smaller_than, streamer); self.offset_to = offset_before - self.origin; self } @@ -74,7 +79,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> let target_key = bound.as_ref(); let streamer = stream_before(self.term_dictionary, target_key.as_ref(), self.has_positions); let smaller_than = |k: &[u8]| k.le(target_key); - let (offset_before, _) = get_offset(smaller_than, streamer); + let (offset_before, _, _) = get_offset(smaller_than, streamer); self.offset_to = offset_before - self.origin; self } @@ -87,7 +92,7 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> TermStreamerImpl { cursor: &data[start..stop], term_delta_decoder: TermDeltaDecoder::with_previous_term(self.current_key), - term_info_decoder: TermInfoDeltaDecoder::new(self.has_positions), // TODO checkpoint + term_info_decoder: TermInfoDeltaDecoder::from_term_info(self.term_info, self.has_positions), // TODO checkpoint } } } @@ -101,21 +106,23 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> /// - the term_buffer state to initialize the block) fn get_offset<'a, P: Fn(&[u8]) -> bool>(predicate: P, mut streamer: TermStreamerImpl<'a>) - -> (usize, Vec) + -> (usize, Vec, TermInfo) { let mut prev: &[u8] = streamer.cursor; + let mut term_info = streamer.value().clone(); let mut prev_data: Vec = Vec::from(streamer.term_delta_decoder.term()); - while let Some((iter_key, _)) = streamer.next() { + while let Some((iter_key, iter_term_info)) = streamer.next() { if !predicate(iter_key.as_ref()) { - return (prev.as_ptr() as usize, prev_data); + return (prev.as_ptr() as usize, prev_data, term_info); } prev = streamer.cursor; prev_data.clear(); prev_data.extend_from_slice(iter_key.as_ref()); + term_info = iter_term_info.clone(); } - (prev.as_ptr() as usize, prev_data) + (prev.as_ptr() as usize, prev_data, term_info) } impl<'a> TermStreamerBuilderImpl<'a> @@ -127,6 +134,7 @@ impl<'a> TermStreamerBuilderImpl<'a> let origin = data.as_ptr() as usize; TermStreamerBuilderImpl { term_dictionary: term_dictionary, + term_info: TermInfo::default(), origin: origin, offset_from: 0, offset_to: data.len(), diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index e5487b5f3..c2bdbbe68 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -1,6 +1,7 @@ #![allow(should_implement_trait)] use std::io::{self, Write}; +use super::CheckPoint; use fst; use fst::raw::Fst; @@ -30,9 +31,9 @@ fn has_positions(field_type: &FieldType) -> bool { if indexing_options.is_position_enabled() { true } - else { - false - } + else { + false + } } _ => { false @@ -47,6 +48,7 @@ pub struct TermDictionaryBuilderImpl term_delta_encoder: TermDeltaEncoder, term_info_encoder: TermInfoDeltaEncoder, block_index: fst::MapBuilder>, + checkpoints: Vec, len: usize, } @@ -62,9 +64,20 @@ impl TermDictionaryBuilderImpl where W: Write { fn add_index_entry(&mut self) { + let stream_offset = self.write.written_bytes() as u32; + let term_info = self.term_info_encoder.term_info(); + let postings_offset = term_info.postings_offset as u32; + let positions_offset = term_info.positions_offset as u32; + let checkpoint = CheckPoint { + stream_offset: stream_offset, + postings_offset: postings_offset, + positions_offset: positions_offset, + }; self.block_index - .insert(&self.term_delta_encoder.term(), self.write.written_bytes() as u64) - .unwrap(); + .insert(&self.term_delta_encoder.term(), self.checkpoints.len() as u64) + .expect("Serializing fst on a Vec should never fail. Where your terms not in order maybe?"); + checkpoint.serialize(&mut self.checkpoints) + .expect("Serializing checkpoint on a Vec should never fail."); } /// # Warning @@ -156,6 +169,7 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl term_delta_encoder: TermDeltaEncoder::default(), term_info_encoder: TermInfoDeltaEncoder::new(has_positions), block_index: fst::MapBuilder::new(vec![]).expect("This cannot fail"), + checkpoints: vec!(), len: 0, }) } @@ -173,12 +187,16 @@ impl TermDictionaryBuilder for TermDictionaryBuilderImpl /// Finalize writing the builder, and returns the underlying /// `Write` object. fn finish(mut self) -> io::Result { + self.add_index_entry(); self.write.write_all(&[0u8; PADDING_SIZE])?; - // self.add_index_entry(); - let (mut w, split_len) = self.write.finish()?; + let fst_addr = self.write.written_bytes(); let fst_write = self.block_index.into_inner().map_err(convert_fst_error)?; - w.write_all(&fst_write)?; - (split_len as u64).serialize(&mut w)?; + self.write.write_all(&fst_write)?; + let check_points_addr = self.write.written_bytes(); + let (mut w, _) = self.write.finish()?; + w.write_all(&self.checkpoints)?; + (fst_addr as u64).serialize(&mut w)?; + (check_points_addr as u64).serialize(&mut w)?; w.flush()?; Ok(w) } @@ -204,6 +222,7 @@ pub struct TermDictionaryImpl { stream_data: ReadOnlySource, fst_index: fst::Map, + checkpoints_data: ReadOnlySource, has_positions: bool, } @@ -213,7 +232,15 @@ impl TermDictionaryImpl self.stream_data.as_slice() } - pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec, u64) { + pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec, CheckPoint) { + let (term, checkpoint_offset) = self.strictly_previous_key_checkpoint_offset(key); + let mut checkpoint_data = &self.checkpoints_data.as_slice()[checkpoint_offset..]; + let checkpoint = CheckPoint::deserialize(&mut checkpoint_data) + .expect("Checkpoint data is corrupted"); + (term, checkpoint) + } + + fn strictly_previous_key_checkpoint_offset(&self, key: &[u8]) -> (Vec, usize) { let fst_map = &self.fst_index; let fst = fst_map.as_fst(); let mut node = fst.root(); @@ -246,12 +273,12 @@ impl TermDictionaryImpl result.push(last_transition.inp); let fork_node = fst.node(last_transition.addr); fill_last(fst, fork_node, &mut result); - let val = fst_map.get(&result).unwrap(); + let val = fst_map.get(&result).expect("Fst data corrupted") as usize; return (result, val); } else if cur_node.is_final() { // the previous key is a prefix let result_buffer = Vec::from(&key[..i]); - let val = fst_map.get(&result_buffer).unwrap(); + let val = fst_map.get(&result_buffer).expect("Fst data corrupted") as usize; return (result_buffer, val); } } @@ -273,19 +300,22 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl source = source.slice_from(1); let total_len = source.len(); - let length_offset = total_len - 8; - let split_len: usize = { - let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - u64::deserialize(&mut split_len_buffer)? as usize - }; - let stream_data = source.slice(0, split_len); - let fst_data = source.slice(split_len, length_offset); + let (body, footer) = source.split(total_len - 16); + + let mut footer_buffer: &[u8] = footer.as_slice(); + let fst_addr: usize = u64::deserialize(&mut footer_buffer)? as usize; + let checkpoints_addr: usize = u64::deserialize(&mut footer_buffer)? as usize; + + let stream_data = body.slice(0, fst_addr - PADDING_SIZE); + let fst_data = body.slice(fst_addr, checkpoints_addr); + let checkpoints_data = body.slice_from(checkpoints_addr); + let fst_index = open_fst_index(fst_data)?; - let len_without_padding = stream_data.len() - PADDING_SIZE; Ok(TermDictionaryImpl { has_positions: has_positions, - stream_data: stream_data.slice(0, len_without_padding), + stream_data: stream_data, + checkpoints_data: checkpoints_data, fst_index: fst_index, }) }