diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index bea35f9fa..b092f0bce 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -52,6 +52,7 @@ impl CompositeWrite { } +#[derive(Clone)] pub struct CompositeFile { data: ReadOnlySource, offsets_index: HashMap, @@ -94,6 +95,14 @@ impl CompositeFile { }) } + pub fn empty() -> CompositeFile { + CompositeFile { + offsets_index: HashMap::new(), + data: ReadOnlySource::empty(), + } + } + + pub fn open_read(&self, field: Field) -> Option { self.offsets_index .get(&field) diff --git a/src/compression/mod.rs b/src/compression/mod.rs index d8540892b..43096622c 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -46,11 +46,11 @@ pub trait VIntDecoder { compressed_data: &'a [u8], offset: u32, num_els: usize) - -> &'a [u8]; + -> usize; fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) - -> &'a [u8]; + -> usize; } impl VIntEncoder for BlockEncoder { @@ -68,7 +68,7 @@ impl VIntDecoder for BlockDecoder { compressed_data: &'a [u8], offset: u32, num_els: usize) - -> &'a [u8] { + -> usize { self.output_len = num_els; vint::uncompress_sorted(compressed_data, &mut self.output[..num_els], offset) } @@ -76,7 +76,7 @@ impl VIntDecoder for BlockDecoder { fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) - -> &'a [u8] { + -> usize { self.output_len = num_els; vint::uncompress_unsorted(compressed_data, &mut self.output[..num_els]) } @@ -100,8 +100,8 @@ pub mod tests { let compressed_data = encoder.compress_block_sorted(&vals, 0); let mut decoder = BlockDecoder::new(); { - let remaining_data = decoder.uncompress_block_sorted(compressed_data, 0); - assert_eq!(remaining_data.len(), 0); + let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0); + assert_eq!(consumed_num_bytes, compressed_data.len()); } for i in 0..128 { assert_eq!(vals[i], decoder.output(i)); @@ -115,8 +115,8 @@ pub mod tests { let compressed_data = encoder.compress_block_sorted(&vals, 10); let mut decoder = BlockDecoder::new(); { - let remaining_data = decoder.uncompress_block_sorted(compressed_data, 10); - assert_eq!(remaining_data.len(), 0); + let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10); + assert_eq!(consumed_num_bytes, compressed_data.len()); } for i in 0..128 { assert_eq!(vals[i], decoder.output(i)); @@ -134,9 +134,9 @@ pub mod tests { compressed.push(173u8); let mut decoder = BlockDecoder::new(); { - let remaining_data = decoder.uncompress_block_sorted(&compressed, 10); - assert_eq!(remaining_data.len(), 1); - assert_eq!(remaining_data[0], 173u8); + let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10); + assert_eq!(consumed_num_bytes, compressed.len() - 1); + assert_eq!(compressed[consumed_num_bytes], 173u8); } for i in 0..n { assert_eq!(vals[i], decoder.output(i)); @@ -154,9 +154,9 @@ pub mod tests { compressed.push(173u8); let mut decoder = BlockDecoder::new(); { - let remaining_data = decoder.uncompress_block_unsorted(&compressed); - assert_eq!(remaining_data.len(), 1); - assert_eq!(remaining_data[0], 173u8); + let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed); + assert_eq!(consumed_num_bytes + 1, compressed.len()); + assert_eq!(compressed[consumed_num_bytes], 173u8); } for i in 0..n { assert_eq!(vals[i], decoder.output(i)); @@ -174,9 +174,9 @@ pub mod tests { let encoded_data = encoder.compress_vint_sorted(&input, *offset); assert!(encoded_data.len() <= expected_length); let mut decoder = BlockDecoder::new(); - let remaining_data = + let consumed_num_bytes = decoder.uncompress_vint_sorted(&encoded_data, *offset, input.len()); - assert_eq!(0, remaining_data.len()); + assert_eq!(consumed_num_bytes, encoded_data.len()); assert_eq!(input, decoder.output_array()); } } diff --git a/src/compression/pack/compression_pack_simd.rs b/src/compression/pack/compression_pack_simd.rs index 6842e0cc2..c430a728f 100644 --- a/src/compression/pack/compression_pack_simd.rs +++ b/src/compression/pack/compression_pack_simd.rs @@ -78,19 +78,19 @@ impl BlockDecoder { } } - pub fn uncompress_block_sorted<'a>(&mut self, - compressed_data: &'a [u8], - offset: u32) - -> &'a [u8] { + pub fn uncompress_block_sorted(&mut self, + compressed_data: &[u8], + offset: u32) + -> usize { let consumed_size = uncompress_sorted(compressed_data, &mut self.output, offset); self.output_len = NUM_DOCS_PER_BLOCK; - &compressed_data[consumed_size..] + consumed_size } - pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> &'a [u8] { + pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize { let consumed_size = uncompress_unsorted(compressed_data, &mut self.output); self.output_len = NUM_DOCS_PER_BLOCK; - &compressed_data[consumed_size..] + consumed_size } #[inline] diff --git a/src/compression/stream.rs b/src/compression/stream.rs index 735eb7bef..0af50ca5b 100644 --- a/src/compression/stream.rs +++ b/src/compression/stream.rs @@ -1,15 +1,16 @@ use compression::BlockDecoder; use compression::NUM_DOCS_PER_BLOCK; use compression::compressed_block_size; +use directory::SourceRead; -pub struct CompressedIntStream<'a> { - buffer: &'a [u8], +pub struct CompressedIntStream { + buffer: SourceRead, block_decoder: BlockDecoder, inner_offset: usize, } -impl<'a> CompressedIntStream<'a> { - pub fn wrap(buffer: &'a [u8]) -> CompressedIntStream<'a> { +impl CompressedIntStream { + pub fn wrap(buffer: SourceRead) -> CompressedIntStream { CompressedIntStream { buffer: buffer, block_decoder: BlockDecoder::new(), @@ -29,7 +30,8 @@ impl<'a> CompressedIntStream<'a> { } num_els -= available; start += available; - self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer); + let num_consumed_bytes = self.block_decoder.uncompress_block_unsorted(self.buffer.as_ref()); + self.buffer.advance(num_consumed_bytes); self.inner_offset = 0; } else { @@ -51,11 +53,12 @@ impl<'a> CompressedIntStream<'a> { // entirely skip decompressing some blocks. while skip_len >= NUM_DOCS_PER_BLOCK { skip_len -= NUM_DOCS_PER_BLOCK; - let num_bits: u8 = self.buffer[0]; + let num_bits: u8 = self.buffer.as_ref()[0]; let block_len = compressed_block_size(num_bits); - self.buffer = &self.buffer[block_len..]; + self.buffer.advance(block_len); } - self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer); + let num_consumed_bytes = self.block_decoder.uncompress_block_unsorted(self.buffer.as_ref()); + self.buffer.advance(num_consumed_bytes); self.inner_offset = skip_len; } } @@ -69,8 +72,9 @@ pub mod tests { use compression::compressed_block_size; use compression::NUM_DOCS_PER_BLOCK; use compression::BlockEncoder; + use directory::{SourceRead, ReadOnlySource}; - fn create_stream_buffer() -> Vec { + fn create_stream_buffer() -> ReadOnlySource { let mut buffer: Vec = vec!(); let mut encoder = BlockEncoder::new(); let vals: Vec = (0u32..1_025u32).collect(); @@ -80,13 +84,14 @@ pub mod tests { assert_eq!(compressed_block_size(num_bits), compressed_block.len()); buffer.extend_from_slice(compressed_block); } - buffer + ReadOnlySource::from(buffer) } #[test] fn test_compressed_int_stream() { let buffer = create_stream_buffer(); - let mut stream = CompressedIntStream::wrap(&buffer[..]); + let buffer_reader = SourceRead::from(buffer); + let mut stream = CompressedIntStream::wrap(buffer_reader); let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK]; stream.read(&mut block[0..2]); diff --git a/src/compression/vint/compression_vint_simd.rs b/src/compression/vint/compression_vint_simd.rs index dbeca660c..f8e09536f 100644 --- a/src/compression/vint/compression_vint_simd.rs +++ b/src/compression/vint/compression_vint_simd.rs @@ -49,20 +49,18 @@ pub fn compress_unsorted<'a>(input: &[u32], output: &'a mut [u8]) -> &'a [u8] { pub fn uncompress_sorted<'a>(compressed_data: &'a [u8], output: &mut [u32], offset: u32) - -> &'a [u8] { - let consumed_bytes = unsafe { + -> usize { + unsafe { streamvbyte::streamvbyte_delta_decode(compressed_data.as_ptr(), output.as_mut_ptr(), output.len() as u32, offset) - }; - &compressed_data[consumed_bytes..] + } } #[inline(always)] -pub fn uncompress_unsorted<'a>(compressed_data: &'a [u8], output: &mut [u32]) -> &'a [u8] { - let consumed_bytes = unsafe { +pub fn uncompress_unsorted<'a>(compressed_data: &'a [u8], output: &mut [u32]) -> usize { + unsafe { streamvbyte::streamvbyte_decode(compressed_data.as_ptr(), output.as_mut_ptr(), output.len()) - }; - &compressed_data[consumed_bytes..] + } } diff --git a/src/core/field_reader.rs b/src/core/field_reader.rs new file mode 100644 index 000000000..bead5bb80 --- /dev/null +++ b/src/core/field_reader.rs @@ -0,0 +1,149 @@ +use directory::{SourceRead, ReadOnlySource}; +use termdict::{TermDictionary, TermDictionaryImpl}; +use std::io; +use postings::{SegmentPostings, BlockSegmentPostings}; +use postings::TermInfo; +use postings::SegmentPostingsOption; +use schema::Term; +use std::cmp; +use fastfield::DeleteBitSet; +use schema::Schema; +use compression::CompressedIntStream; + +pub struct FieldReader { + termdict: TermDictionaryImpl, + postings_source: ReadOnlySource, + positions_source: ReadOnlySource, + delete_bitset: DeleteBitSet, + schema: Schema, +} + +impl FieldReader { + + pub(crate) fn new( + termdict_source: ReadOnlySource, + postings_source: ReadOnlySource, + positions_source: ReadOnlySource, + delete_bitset: DeleteBitSet, + schema: Schema, + + ) -> io::Result { + + Ok(FieldReader { + termdict: TermDictionaryImpl::from_source(termdict_source)?, + postings_source: postings_source, + positions_source: positions_source, + delete_bitset: delete_bitset, + schema: schema, + }) + } + + /// Returns the term info associated with the term. + pub fn get_term_info(&self, term: &Term) -> Option { + self.termdict.get(term.as_slice()) + } + + + /// Return the term dictionary datastructure. + pub fn terms(&self) -> &TermDictionaryImpl { + &self.termdict + } + + /// Resets the block segment to another position of the postings + /// file. + /// + /// This is useful for enumerating through a list of terms, + /// and consuming the associated posting lists while avoiding + /// reallocating a `BlockSegmentPostings`. + /// + /// # Warning + /// + /// This does not reset the positions list. + pub fn reset_block_postings_from_terminfo(&self, + term_info: &TermInfo, + block_postings: &mut BlockSegmentPostings) { + let offset = term_info.postings_offset as usize; + let end_source = self.postings_source.len(); + let postings_slice = self.postings_source.slice(offset, end_source); + let postings_reader = SourceRead::from(postings_slice); + block_postings.reset(term_info.doc_freq as usize, postings_reader); + } + + + + /// Returns a block postings given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most user should prefer using `read_postings` instead. + pub fn read_block_postings_from_terminfo(&self, + term_info: &TermInfo, + option: SegmentPostingsOption) + -> BlockSegmentPostings { + let offset = term_info.postings_offset as usize; + let postings_data = self.postings_source.slice_from(offset); + let has_freq = option.has_freq(); + BlockSegmentPostings::from_data( + term_info.doc_freq as usize, + SourceRead::from(postings_data), + has_freq) + } + + /// Returns a posting object given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most user should prefer using `read_postings` instead. + pub fn read_postings_from_terminfo(&self, + term_info: &TermInfo, + option: SegmentPostingsOption) + -> SegmentPostings { + let block_postings = self.read_block_postings_from_terminfo(term_info, option); + let delete_bitset = self.delete_bitset.clone(); + let position_stream = { + if option.has_positions() { + let position_offset = term_info.positions_offset; + let positions_reader = SourceRead::from(self.positions_source.slice_from(position_offset as usize)); + let mut stream = CompressedIntStream::wrap(positions_reader); + stream.skip(term_info.positions_inner_offset as usize); + Some(stream) + } + else { + None + } + }; + SegmentPostings::from_block_postings( + block_postings, + delete_bitset, + position_stream + ) + } + + /// Returns the segment postings associated with the term, and with the given option, + /// or `None` if the term has never been encountered and indexed. + /// + /// If the field was not indexed with the indexing options that cover + /// the requested options, the returned `SegmentPostings` the method does not fail + /// and returns a `SegmentPostings` with as much information as possible. + /// + /// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a + /// `TextIndexingOptions` that does not index position will return a `SegmentPostings` + /// with `DocId`s and frequencies. + pub fn read_postings(&self, + term: &Term, + option: SegmentPostingsOption) + -> Option { + let field = term.field(); + let field_entry = self.schema.get_field_entry(field); + let term_info = get!(self.get_term_info(term)); + let maximum_option = get!(field_entry.field_type().get_segment_postings_option()); + let best_effort_option = cmp::min(maximum_option, option); + Some(self.read_postings_from_terminfo(&term_info, best_effort_option)) + } + + /// Returns the number of documents containing the term. + pub fn doc_freq(&self, term: &Term) -> u32 { + match self.get_term_info(term) { + Some(term_info) => term_info.doc_freq, + None => 0, + } + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index dca8b5ccd..bba1447ef 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -7,7 +7,9 @@ mod segment; mod index_meta; mod pool; mod segment_meta; +mod field_reader; +pub use self::field_reader::FieldReader; pub use self::searcher::Searcher; pub use self::segment_component::SegmentComponent; pub use self::segment_id::SegmentId; @@ -18,7 +20,6 @@ pub use self::index::Index; pub use self::segment_meta::SegmentMeta; pub use self::index_meta::IndexMeta; - use std::path::PathBuf; lazy_static! { diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 6579698e2..d84ad22a3 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -6,9 +6,11 @@ use common::TimerTree; use query::Query; use DocId; use DocAddress; -use schema::Term; -use termdict::TermMerger; +use schema::{Term, Field}; +use termdict::{TermMerger, TermDictionary}; +use std::sync::Arc; use std::fmt; +use core::FieldReader; use postings::TermInfo; @@ -46,7 +48,12 @@ impl Searcher { pub fn doc_freq(&self, term: &Term) -> u32 { self.segment_readers .iter() - .map(|segment_reader| segment_reader.doc_freq(term)) + .map(|segment_reader| { + segment_reader + .field_reader(term.field()) + .unwrap() // TODO error handling + .doc_freq(term) + }) .fold(0u32, |acc, val| acc + val) } @@ -65,20 +72,46 @@ impl Searcher { query.search(self, collector) } - /// Returns a Stream over all of the sorted unique terms of - /// the searcher. - /// - /// This includes all of the fields from all of the segment_readers. - /// See [`TermIterator`](struct.TermIterator.html). - /// - /// # Warning - /// This API is very likely to change in the future. - pub fn terms(&self) -> TermMerger { - TermMerger::from(self.segment_readers()) + pub fn field(&self, field: Field) -> Result { + let field_readers = self.segment_readers + .iter() + .map(|segment_reader| { + segment_reader.field_reader(field) + }) + .collect::>>()?; + Ok(FieldSearcher::new(field_readers)) } } + +pub struct FieldSearcher { + field_readers: Vec>, +} + + +impl FieldSearcher { + + fn new(field_readers: Vec>) -> FieldSearcher { + FieldSearcher { + field_readers: field_readers, + } + } + + + /// Returns a Stream over all of the sorted unique terms of + /// for the given field. + pub fn terms(&self) -> TermMerger { + let term_streamers: Vec<_> = self.field_readers + .iter() + .map(|field_reader| { + field_reader.terms().stream() + }) + .collect(); + TermMerger::new(term_streamers) + } +} + impl From> for Searcher { fn from(segment_readers: Vec) -> Searcher { Searcher { segment_readers: segment_readers } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 619888228..336496018 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -2,26 +2,21 @@ use Result; use core::Segment; use core::SegmentId; use core::SegmentComponent; -use schema::Term; +use std::sync::RwLock; use common::HasLen; -use compression::CompressedIntStream; use core::SegmentMeta; use fastfield::{self, FastFieldNotAvailableError}; use fastfield::DeleteBitSet; use store::StoreReader; use schema::Document; -use directory::ReadOnlySource; use DocId; use std::str; -use termdict::TermDictionary; -use std::cmp; -use postings::TermInfo; -use termdict::TermDictionaryImpl; use std::sync::Arc; +use std::collections::HashMap; +use common::CompositeFile; use std::fmt; +use core::FieldReader; use schema::Field; -use postings::SegmentPostingsOption; -use postings::{SegmentPostings, BlockSegmentPostings}; use fastfield::{FastFieldsReader, FastFieldReader, U64FastFieldReader}; use schema::Schema; @@ -40,15 +35,19 @@ use schema::Schema; /// #[derive(Clone)] pub struct SegmentReader { + field_reader_cache: Arc>>>, + segment_id: SegmentId, segment_meta: SegmentMeta, - terms: Arc, - postings_data: ReadOnlySource, + + termdict_composite: CompositeFile, + postings_composite: CompositeFile, + positions_composite: CompositeFile, + store_reader: StoreReader, fast_fields_reader: Arc, fieldnorms_reader: Arc, delete_bitset: DeleteBitSet, - positions_data: ReadOnlySource, schema: Schema, } @@ -117,14 +116,6 @@ impl SegmentReader { self.fieldnorms_reader.open_reader(field) } - /// Returns the number of documents containing the term. - pub fn doc_freq(&self, term: &Term) -> u32 { - match self.get_term_info(term) { - Some(term_info) => term_info.doc_freq, - None => 0, - } - } - /// Accessor to the segment's `StoreReader`. pub fn get_store_reader(&self) -> &StoreReader { &self.store_reader @@ -133,13 +124,24 @@ impl SegmentReader { /// Open a new segment for reading. pub fn open(segment: Segment) -> Result { - let source = segment.open_read(SegmentComponent::TERMS)?; - let terms = TermDictionaryImpl::from_source(source)?; + let termdict_source = segment.open_read(SegmentComponent::TERMS)?; + let termdict_composite = CompositeFile::open(termdict_source)?; let store_source = segment.open_read(SegmentComponent::STORE)?; let store_reader = StoreReader::from_source(store_source); - let postings_shared_mmap = segment.open_read(SegmentComponent::POSTINGS)?; + let postings_source = segment.open_read(SegmentComponent::POSTINGS)?; + let postings_composite = CompositeFile::open(postings_source)?; + + let positions_composite = { + if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) { + CompositeFile::open(source)? + } + else { + CompositeFile::empty() + } + }; + let fast_field_data = segment.open_read(SegmentComponent::FASTFIELDS)?; let fast_fields_reader = FastFieldsReader::from_source(fast_field_data)?; @@ -147,9 +149,6 @@ impl SegmentReader { let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?; let fieldnorms_reader = FastFieldsReader::from_source(fieldnorms_data)?; - let positions_data = segment - .open_read(SegmentComponent::POSITIONS) - .unwrap_or_else(|_| ReadOnlySource::empty()); let delete_bitset = if segment.meta().has_deletes() { let delete_data = segment.open_read(SegmentComponent::DELETE)?; @@ -160,22 +159,53 @@ impl SegmentReader { let schema = segment.schema(); Ok(SegmentReader { - segment_meta: segment.meta().clone(), - postings_data: postings_shared_mmap, - terms: Arc::new(terms), - segment_id: segment.id(), - store_reader: store_reader, - fast_fields_reader: Arc::new(fast_fields_reader), - fieldnorms_reader: Arc::new(fieldnorms_reader), - delete_bitset: delete_bitset, - positions_data: positions_data, - schema: schema, - }) + field_reader_cache: Arc::new(RwLock::new(HashMap::new())), + segment_meta: segment.meta().clone(), + postings_composite: postings_composite, + termdict_composite: termdict_composite, + segment_id: segment.id(), + store_reader: store_reader, + fast_fields_reader: Arc::new(fast_fields_reader), + fieldnorms_reader: Arc::new(fieldnorms_reader), + delete_bitset: delete_bitset, + positions_composite: positions_composite, + schema: schema, + }) } - /// Return the term dictionary datastructure. - pub fn terms(&self) -> &TermDictionaryImpl { - &self.terms + pub fn field_reader(&self, field: Field) -> Result> { + if let Some(field_reader) = self.field_reader_cache.read() + .unwrap() // TODO + .get(&field) { + return Ok(field_reader.clone()); + } + + // TODO better error + let termdict_source = self.termdict_composite + .open_read(field) + .ok_or("Field not found")?; + + let postings_source = self.postings_composite + .open_read(field) + .ok_or("field not found")?; + + let positions_source = self.positions_composite + .open_read(field) + .ok_or("field not found")?; + + let field_reader = Arc::new(FieldReader::new( + termdict_source, + postings_source, + positions_source, + self.delete_bitset.clone(), + self.schema.clone(), + )?); + + self.field_reader_cache + .write() + .unwrap() // TODO + .insert(field, field_reader.clone()); + Ok(field_reader) } /// Returns the document (or to be accurate, its stored field) @@ -187,100 +217,6 @@ impl SegmentReader { } - /// Returns the segment postings associated with the term, and with the given option, - /// or `None` if the term has never been encountered and indexed. - /// - /// If the field was not indexed with the indexing options that cover - /// the requested options, the returned `SegmentPostings` the method does not fail - /// and returns a `SegmentPostings` with as much information as possible. - /// - /// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a - /// `TextIndexingOptions` that does not index position will return a `SegmentPostings` - /// with `DocId`s and frequencies. - pub fn read_postings(&self, - term: &Term, - option: SegmentPostingsOption) - -> Option { - let field = term.field(); - let field_entry = self.schema.get_field_entry(field); - let term_info = get!(self.get_term_info(term)); - let maximum_option = get!(field_entry.field_type().get_segment_postings_option()); - let best_effort_option = cmp::min(maximum_option, option); - Some(self.read_postings_from_terminfo(&term_info, best_effort_option)) - } - - - /// Returns a posting object given a `term_info`. - /// This method is for an advanced usage only. - /// - /// Most user should prefer using `read_postings` instead. - pub fn read_postings_from_terminfo(&self, - term_info: &TermInfo, - option: SegmentPostingsOption) - -> SegmentPostings { - let block_postings = self.read_block_postings_from_terminfo(term_info, option); - let delete_bitset = self.delete_bitset.clone(); - let position_stream = { - if option.has_positions() { - let position_offset = term_info.positions_offset; - let positions_data = &self.positions_data[position_offset as usize..]; - let mut stream = CompressedIntStream::wrap(positions_data); - stream.skip(term_info.positions_inner_offset as usize); - Some(stream) - } - else { - None - } - }; - SegmentPostings::from_block_postings( - block_postings, - delete_bitset, - position_stream - ) - } - - - /// Returns a block postings given a `term_info`. - /// This method is for an advanced usage only. - /// - /// Most user should prefer using `read_postings` instead. - pub fn read_block_postings_from_terminfo(&self, - term_info: &TermInfo, - option: SegmentPostingsOption) - -> BlockSegmentPostings { - let offset = term_info.postings_offset as usize; - let postings_data = &self.postings_data[offset..]; - let has_freq = option.has_freq(); - BlockSegmentPostings::from_data( - term_info.doc_freq as usize, - postings_data, - has_freq) - } - - - /// Resets the block segment to another position of the postings - /// file. - /// - /// This is useful for enumerating through a list of terms, - /// and consuming the associated posting lists while avoiding - /// reallocating a `BlockSegmentPostings`. - /// - /// # Warning - /// - /// This does not reset the positions list. - pub fn reset_block_postings_from_terminfo<'a>(&'a self, - term_info: &TermInfo, - block_postings: &mut BlockSegmentPostings<'a>) { - let offset = term_info.postings_offset as usize; - let postings_data: &'a [u8] = &self.postings_data[offset..]; - block_postings.reset(term_info.doc_freq as usize, postings_data); - } - - /// Returns the term info associated with the term. - pub fn get_term_info(&self, term: &Term) -> Option { - self.terms.get(term.as_slice()) - } - /// Returns the segment id pub fn segment_id(&self) -> SegmentId { self.segment_id diff --git a/src/directory/mod.rs b/src/directory/mod.rs index b107d78c5..cfdaee719 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -16,7 +16,7 @@ pub mod error; use std::io::{Write, Seek}; use std::io::BufWriter; -pub use self::read_only_source::ReadOnlySource; +pub use self::read_only_source::{SourceRead, ReadOnlySource}; pub use self::directory::Directory; pub use self::ram_directory::RAMDirectory; pub use self::mmap_directory::MmapDirectory; diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index d327f5a51..1fd0afc0f 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -2,6 +2,8 @@ use fst::raw::MmapReadOnly; use std::ops::Deref; use super::shared_vec_slice::SharedVecSlice; use common::HasLen; +use std::slice; +use std::io::{self, Read}; use stable_deref_trait::StableDeref; /// Read object that represents files in tantivy. @@ -62,6 +64,11 @@ impl ReadOnlySource { } } } + + pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource { + let len = self.len(); + self.slice(from_offset, len) + } } impl HasLen for ReadOnlySource { @@ -82,3 +89,38 @@ impl From> for ReadOnlySource { ReadOnlySource::Anonymous(shared_data) } } + +pub struct SourceRead { + _data_owner: ReadOnlySource, + cursor: &'static [u8] +} + +impl SourceRead { + pub fn advance(&mut self, len: usize) { + self.cursor = &self.cursor[len..]; + } +} + +impl AsRef<[u8]> for SourceRead { + fn as_ref(&self) -> &[u8] { + self.cursor + } +} + +impl From for SourceRead { + fn from(source: ReadOnlySource) -> SourceRead { + let len = source.len(); + let slice_ptr = source.as_slice().as_ptr(); + let static_slice = unsafe { slice::from_raw_parts(slice_ptr, len) }; + SourceRead { + _data_owner: source, + cursor: static_slice, + } + } +} + +impl Read for SourceRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.cursor.read(buf) + } +} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 565c3089e..1477fb50b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -177,8 +177,9 @@ pub fn compute_deleted_bitset(delete_bitset: &mut BitSet, // Limit doc helps identify the first document // that may be affected by the delete operation. let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp); + let field_reader = segment_reader.field_reader(delete_op.term.field())?; if let Some(mut docset) = - segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { + field_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { while docset.advance() { let deleted_doc = docset.doc(); if deleted_doc < limit_doc { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 74cdd625a..48aa695aa 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -17,6 +17,7 @@ use fastfield::FastFieldSerializer; use fastfield::FastFieldReader; use store::StoreWriter; use std::cmp::{min, max}; +use termdict::TermDictionary; use schema::Term; use termdict::TermStreamer; @@ -195,48 +196,62 @@ impl IndexMerger { fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> { let mut delta_computer = DeltaComputer::new(); - let mut merged_terms = TermMerger::from(&self.readers[..]); - let mut max_doc = 0; - - // map from segment doc ids to the resulting merged segment doc id. - let mut merged_doc_id_map: Vec>> = Vec::with_capacity(self.readers.len()); - - for reader in &self.readers { - let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize); - for doc_id in 0..reader.max_doc() { - if reader.is_deleted(doc_id) { - segment_local_map.push(None); - } else { - segment_local_map.push(Some(max_doc)); - max_doc += 1u32; - } + let mut indexed_fields = vec!(); + for (field_ord, field_entry) in self.schema.fields().iter().enumerate() { + // if field_entry + if field_entry.is_indexed() { + indexed_fields.push(Field(field_ord as u32)); } - merged_doc_id_map.push(segment_local_map); } - // Create the total list of doc ids - // by stacking the doc ids from the different segment. - // - // In the new segments, the doc id from the different - // segment are stacked so that : - // - Segment 0's doc ids become doc id [0, seg.max_doc] - // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] - // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, - // seg0.max_doc + seg1.max_doc + seg2.max_doc] - // ... - if !merged_terms.advance() { - return Ok(()); - } + for indexed_field in indexed_fields { - let mut current_field = Term::wrap(merged_terms.key()).field(); + let field_readers = self.readers + .iter() + .map(|reader| + reader.field_reader(indexed_field)) + .collect::>>()?; - loop { - // this loop processes all fields. - let mut field_serializer = serializer.new_field(current_field)?; + let field_term_streams = field_readers + .iter() + .map(|field_reader| field_reader.terms().stream() ) + .collect(); + + let mut merged_terms = TermMerger::new(field_term_streams); + let mut max_doc = 0; + + // map from segment doc ids to the resulting merged segment doc id. + let mut merged_doc_id_map: Vec>> = Vec::with_capacity(self.readers.len()); + + for reader in &self.readers { + let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize); + for doc_id in 0..reader.max_doc() { + if reader.is_deleted(doc_id) { + segment_local_map.push(None); + } else { + segment_local_map.push(Some(max_doc)); + max_doc += 1u32; + } + } + merged_doc_id_map.push(segment_local_map); + } + + // Create the total list of doc ids + // by stacking the doc ids from the different segment. + // + // In the new segments, the doc id from the different + // segment are stacked so that : + // - Segment 0's doc ids become doc id [0, seg.max_doc] + // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] + // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, + // seg0.max_doc + seg1.max_doc + seg2.max_doc] + // ... + + let mut field_serializer = serializer.new_field(indexed_field)?; + + let field_entry = self.schema.get_field_entry(indexed_field); - // we reached a new field. - let field_entry = self.schema.get_field_entry(current_field); // ... set segment postings option the new field. let segment_postings_option = field_entry .field_type() @@ -244,88 +259,78 @@ impl IndexMerger { .expect("Encountered a field that is not supposed to be indexed. Have you modified the schema?"); - loop { - // this loops processes a field. - { - let term = Term::wrap(merged_terms.key()); + while merged_terms.advance() { - // Let's compute the list of non-empty posting lists - let segment_postings: Vec<_> = merged_terms - .current_kvs() - .iter() - .flat_map(|heap_item| { - let segment_ord = heap_item.segment_ord; - let term_info = heap_item.streamer.value(); - let segment_reader = &self.readers[heap_item.segment_ord]; - let mut segment_postings = - segment_reader - .read_postings_from_terminfo(term_info, segment_postings_option); - if segment_postings.advance() { - Some((segment_ord, segment_postings)) - } else { - None + let term = Term::wrap(merged_terms.key()); + + // Let's compute the list of non-empty posting lists + let segment_postings: Vec<_> = merged_terms + .current_kvs() + .iter() + .flat_map(|heap_item| { + let segment_ord = heap_item.segment_ord; + let term_info = heap_item.streamer.value(); + let segment_reader = &self.readers[heap_item.segment_ord]; + let field_reader = segment_reader.field_reader(term.field()).unwrap(); // TODO fix unwrap + let mut segment_postings = field_reader + .read_postings_from_terminfo(term_info, segment_postings_option); + if segment_postings.advance() { + Some((segment_ord, segment_postings)) + } else { + None + } + }) + .collect(); + + // At this point, `segment_postings` contains the posting list + // of all of the segments containing the given term. + // + // These segments are non-empty and advance has already been called. + + if !segment_postings.is_empty() { + // If not, the `term` will be entirely removed. + + // We know that there is at least one document containing + // the term, so we add it. + field_serializer.new_term(term.as_ref())?; + + // We can now serialize this postings, by pushing each document to the + // postings serializer. + + for (segment_ord, mut segment_postings) in segment_postings { + let old_to_new_doc_id = &merged_doc_id_map[segment_ord]; + loop { + // `.advance()` has been called once before the loop. + // Hence we cannot use a `while segment_postings.advance()` loop. + if let Some(remapped_doc_id) = + old_to_new_doc_id[segment_postings.doc() as usize] { + // we make sure to only write the term iff + // there is at least one document. + let positions: &[u32] = segment_postings.positions(); + let term_freq = segment_postings.term_freq(); + let delta_positions = delta_computer.compute_delta(positions); + field_serializer + .write_doc(remapped_doc_id, term_freq, delta_positions)?; } - }) - .collect(); - - // At this point, `segment_postings` contains the posting list - // of all of the segments containing the given term. - // - // These segments are non-empty and advance has already been called. - - if !segment_postings.is_empty() { - // If not, the `term` will be entirely removed. - - // We know that there is at least one document containing - // the term, so we add it. - field_serializer.new_term(term.as_ref())?; - - // We can now serialize this postings, by pushing each document to the - // postings serializer. - - for (segment_ord, mut segment_postings) in segment_postings { - let old_to_new_doc_id = &merged_doc_id_map[segment_ord]; - loop { - // `.advance()` has been called once before the loop. - // Hence we cannot use a `while segment_postings.advance()` loop. - if let Some(remapped_doc_id) = - old_to_new_doc_id[segment_postings.doc() as usize] { - // we make sure to only write the term iff - // there is at least one document. - let positions: &[u32] = segment_postings.positions(); - let term_freq = segment_postings.term_freq(); - let delta_positions = delta_computer.compute_delta(positions); - field_serializer - .write_doc(remapped_doc_id, term_freq, delta_positions)?; - } - if !segment_postings.advance() { - break; - } + if !segment_postings.advance() { + break; } } - - // closing the term. - field_serializer.close_term()?; } + // closing the term. + field_serializer.close_term()?; } - - if !merged_terms.advance() { - field_serializer.close()?; - return Ok(()) - } - - { - let next_term_field = Term::wrap(merged_terms.key()).field(); - if next_term_field != current_field { - current_field = next_term_field; - field_serializer.close()?; - break; - } - } } + + field_serializer.close()?; + } + /* + + */ + Ok(()) } fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index d719badfb..c926b67b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -390,15 +390,16 @@ mod tests { index.load_searchers().unwrap(); let searcher = index.searcher(); let reader = searcher.segment_reader(0); - assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none()); + let field_reader = reader.field_reader(text_field).unwrap(); + assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none()); { - let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 5); assert!(!postings.advance()); } { - let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 3); assert!(postings.advance()); @@ -424,16 +425,17 @@ mod tests { index.load_searchers().unwrap(); let searcher = index.searcher(); let reader = searcher.segment_reader(0); + let field_reader = reader.field_reader(term_abcd.field()).unwrap(); - assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none()); + assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none()); { - let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 5); assert!(!postings.advance()); } { - let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 3); assert!(postings.advance()); @@ -459,13 +461,14 @@ mod tests { index.load_searchers().unwrap(); let searcher = index.searcher(); let reader = searcher.segment_reader(0); - assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none()); + let field_reader = reader.field_reader(term_abcd.field()).unwrap(); + assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none()); { - let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap(); assert!(!postings.advance()); } { - let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 3); assert!(postings.advance()); @@ -473,7 +476,7 @@ mod tests { assert!(!postings.advance()); } { - let mut postings = reader.read_postings(&term_c, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_c, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 4); assert!(!postings.advance()); @@ -497,6 +500,7 @@ mod tests { let term = Term::from_field_u64(field, 1u64); let mut postings = searcher .segment_reader(0) + .field_reader(term.field()).unwrap() .read_postings(&term, SegmentPostingsOption::NoFreq) .unwrap(); assert!(postings.advance()); @@ -520,6 +524,7 @@ mod tests { let term = Term::from_field_i64(value_field, negative_val); let mut postings = searcher .segment_reader(0) + .field_reader(term.field()).unwrap() .read_postings(&term, SegmentPostingsOption::NoFreq) .unwrap(); assert!(postings.advance()); @@ -582,10 +587,11 @@ mod tests { index.load_searchers().unwrap(); let searcher = index.searcher(); let reader = searcher.segment_reader(0); + let field_reader = reader.field_reader(text_field).unwrap(); let term_abcd = Term::from_field_text(text_field, "abcd"); - assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none()); + assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none()); let term_af = Term::from_field_text(text_field, "af"); - let mut postings = reader.read_postings(&term_af, FreqAndPositions).unwrap(); + let mut postings = field_reader.read_postings(&term_af, FreqAndPositions).unwrap(); assert!(postings.advance()); assert_eq!(postings.doc(), 0); assert_eq!(postings.term_freq(), 3); diff --git a/src/postings/mod.rs b/src/postings/mod.rs index d1a05bbb0..05991a1d1 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -132,12 +132,14 @@ mod tests { { let term_a = Term::from_field_text(text_field, "abcdef"); assert!(segment_reader - .read_postings(&term_a, FreqAndPositions) - .is_none()); + .field_reader(term_a.field()).unwrap() + .read_postings(&term_a, FreqAndPositions) + .is_none()); } { let term_a = Term::from_field_text(text_field, "a"); let mut postings_a = segment_reader + .field_reader(term_a.field()).unwrap() .read_postings(&term_a, FreqAndPositions) .unwrap(); assert_eq!(postings_a.len(), 1000); @@ -160,6 +162,7 @@ mod tests { { let term_e = Term::from_field_text(text_field, "e"); let mut postings_e = segment_reader + .field_reader(term_e.field()).unwrap() .read_postings(&term_e, FreqAndPositions) .unwrap(); assert_eq!(postings_e.len(), 1000 - 2); @@ -247,6 +250,7 @@ mod tests { for i in 0..num_docs - 1 { for j in i + 1..num_docs { let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); @@ -260,6 +264,7 @@ mod tests { { let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); @@ -280,6 +285,7 @@ mod tests { // check that filtering works { let mut segment_postings = segment_reader + .field_reader(term_0.field()).unwrap() .read_postings(&term_0, SegmentPostingsOption::NoFreq) .unwrap(); @@ -289,6 +295,7 @@ mod tests { } let mut segment_postings = segment_reader + .field_reader(term_0.field()).unwrap() .read_postings(&term_0, SegmentPostingsOption::NoFreq) .unwrap(); @@ -313,6 +320,7 @@ mod tests { // make sure seeking still works for i in 0..num_docs { let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); @@ -328,6 +336,7 @@ mod tests { // now try with a longer sequence { let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); @@ -363,12 +372,14 @@ mod tests { // finally, check that it's empty { let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); assert_eq!(segment_postings.skip_next(0), SkipResult::End); let mut segment_postings = segment_reader + .field_reader(term_2.field()).unwrap() .read_postings(&term_2, SegmentPostingsOption::NoFreq) .unwrap(); @@ -436,6 +447,7 @@ mod tests { b.iter(|| { let mut segment_postings = segment_reader + .field_reader(TERM_A.field()).unwrap() .read_postings(&*TERM_A, SegmentPostingsOption::NoFreq) .unwrap(); while segment_postings.advance() {} @@ -448,15 +460,19 @@ mod tests { let segment_reader = searcher.segment_reader(0); b.iter(|| { let segment_postings_a = segment_reader + .field_reader(TERM_A.field()).unwrap() .read_postings(&*TERM_A, SegmentPostingsOption::NoFreq) .unwrap(); let segment_postings_b = segment_reader + .field_reader(TERM_B.field()).unwrap() .read_postings(&*TERM_B, SegmentPostingsOption::NoFreq) .unwrap(); let segment_postings_c = segment_reader + .field_reader(TERM_C.field()).unwrap() .read_postings(&*TERM_C, SegmentPostingsOption::NoFreq) .unwrap(); let segment_postings_d = segment_reader + .field_reader(TERM_D.field()).unwrap() .read_postings(&*TERM_D, SegmentPostingsOption::NoFreq) .unwrap(); let mut intersection = IntersectionDocSet::from(vec![segment_postings_a, @@ -473,6 +489,7 @@ mod tests { let docs = tests::sample(segment_reader.num_docs(), p); let mut segment_postings = segment_reader + .field_reader(TERM_A.field()).unwrap() .read_postings(&*TERM_A, SegmentPostingsOption::NoFreq) .unwrap(); @@ -489,6 +506,7 @@ mod tests { b.iter(|| { let mut segment_postings = segment_reader + .field_reader(TERM_A.field()).unwrap() .read_postings(&*TERM_A, SegmentPostingsOption::NoFreq) .unwrap(); for doc in &existing_docs { @@ -526,6 +544,7 @@ mod tests { b.iter(|| { let n: u32 = test::black_box(17); let mut segment_postings = segment_reader + .field_reader(TERM_A.field()).unwrap() .read_postings(&*TERM_A, SegmentPostingsOption::NoFreq) .unwrap(); let mut s = 0u32; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index a6306b141..4e1f770c7 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -101,8 +101,9 @@ impl<'a> MultiFieldPostingsWriter<'a> { let (field, start) = offsets[i]; let (_, stop) = offsets[i + 1]; let postings_writer = &self.per_field_postings_writers[field.0 as usize]; - let field_serializer = serializer.new_field(field)?; - postings_writer.serialize(&term_offsets[start..stop], field_serializer, self.heap)?; + let mut field_serializer = serializer.new_field(field)?; + postings_writer.serialize(&term_offsets[start..stop], &mut field_serializer, self.heap)?; + field_serializer.close()?; } Ok(()) } @@ -137,7 +138,7 @@ pub trait PostingsWriter { /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize(&self, term_addrs: &[(&[u8], u32)], - serializer: FieldSerializer, + serializer: &mut FieldSerializer, heap: &Heap) -> io::Result<()>; @@ -214,13 +215,13 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' fn serialize(&self, term_addrs: &[(&[u8], u32)], - mut serializer: FieldSerializer, + serializer: &mut FieldSerializer, heap: &Heap) -> io::Result<()> { for &(term_bytes, addr) in term_addrs { let recorder: &mut Rec = self.heap.get_mut_ref(addr); serializer.new_term(term_bytes)?; - recorder.serialize(addr, &mut serializer, heap)?; + recorder.serialize(addr, serializer, heap)?; serializer.close_term()?; } Ok(()) diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index 26810edf4..0866a5fe5 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -5,11 +5,15 @@ use std::cmp; use fst::Streamer; use fastfield::DeleteBitSet; use std::cell::UnsafeCell; +use directory::{SourceRead, ReadOnlySource}; + -const EMPTY_DATA: [u8; 0] = [0u8; 0]; const EMPTY_POSITIONS: [u32; 0] = [0u32; 0]; -struct PositionComputer<'a> { + + + +struct PositionComputer { // store the amount of position int // before reading positions. // @@ -17,12 +21,12 @@ struct PositionComputer<'a> { // the positions vec. position_to_skip: Option, positions: Vec, - positions_stream: CompressedIntStream<'a>, + positions_stream: CompressedIntStream, } -impl<'a> PositionComputer<'a> { +impl PositionComputer { - pub fn new(positions_stream: CompressedIntStream<'a>) -> PositionComputer<'a> { + pub fn new(positions_stream: CompressedIntStream) -> PositionComputer { PositionComputer { position_to_skip: None, positions: vec!(), @@ -64,25 +68,25 @@ impl<'a> PositionComputer<'a> { /// /// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded. /// Positions on the other hand, are optionally entirely decoded upfront. -pub struct SegmentPostings<'a> { - block_cursor: BlockSegmentPostings<'a>, +pub struct SegmentPostings { + block_cursor: BlockSegmentPostings, cur: usize, delete_bitset: DeleteBitSet, - position_computer: Option>>, + position_computer: Option>, } -impl<'a> SegmentPostings<'a> { +impl SegmentPostings { /// Reads a Segment postings from an &[u8] /// /// * `len` - number of document in the posting lists. /// * `data` - data array. The complete data is not necessarily used. /// * `freq_handler` - the freq handler is in charge of decoding /// frequencies and/or positions - pub fn from_block_postings(segment_block_postings: BlockSegmentPostings<'a>, + pub fn from_block_postings(segment_block_postings: BlockSegmentPostings, delete_bitset: DeleteBitSet, - positions_stream_opt: Option>) - -> SegmentPostings<'a> { + positions_stream_opt: Option) + -> SegmentPostings { let position_computer = positions_stream_opt.map(|stream| { UnsafeCell::new(PositionComputer::new(stream)) }); @@ -95,7 +99,7 @@ impl<'a> SegmentPostings<'a> { } /// Returns an empty segment postings object - pub fn empty() -> SegmentPostings<'a> { + pub fn empty() -> SegmentPostings { let empty_block_cursor = BlockSegmentPostings::empty(); SegmentPostings { block_cursor: empty_block_cursor, @@ -117,7 +121,7 @@ impl<'a> SegmentPostings<'a> { } -impl<'a> DocSet for SegmentPostings<'a> { +impl DocSet for SegmentPostings { // goes to the next element. // next needs to be called a first time to point to the correct element. #[inline] @@ -259,13 +263,13 @@ impl<'a> DocSet for SegmentPostings<'a> { } } -impl<'a> HasLen for SegmentPostings<'a> { +impl HasLen for SegmentPostings { fn len(&self) -> usize { self.block_cursor.doc_freq() } } -impl<'a> Postings for SegmentPostings<'a> { +impl Postings for SegmentPostings { fn term_freq(&self) -> u32 { self.block_cursor.freq(self.cur) } @@ -286,6 +290,7 @@ impl<'a> Postings for SegmentPostings<'a> { } + /// `BlockSegmentPostings` is a cursor iterating over blocks /// of documents. /// @@ -293,7 +298,7 @@ impl<'a> Postings for SegmentPostings<'a> { /// /// While it is useful for some very specific high-performance /// use cases, you should prefer using `SegmentPostings` for most usage. -pub struct BlockSegmentPostings<'a> { +pub struct BlockSegmentPostings { doc_decoder: BlockDecoder, freq_decoder: BlockDecoder, has_freq: bool, @@ -302,14 +307,14 @@ pub struct BlockSegmentPostings<'a> { doc_offset: DocId, num_binpacked_blocks: usize, num_vint_docs: usize, - remaining_data: &'a [u8], + remaining_data: SourceRead, } -impl<'a> BlockSegmentPostings<'a> { +impl BlockSegmentPostings { pub(crate) fn from_data(doc_freq: usize, - data: &'a [u8], + data: SourceRead, has_freq: bool) - -> BlockSegmentPostings<'a> { + -> BlockSegmentPostings { let num_binpacked_blocks: usize = (doc_freq as usize) / NUM_DOCS_PER_BLOCK; let num_vint_docs = (doc_freq as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks; BlockSegmentPostings { @@ -337,7 +342,7 @@ impl<'a> BlockSegmentPostings<'a> { // # Warning // // This does not reset the positions list. - pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: &'a [u8]) { + pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: SourceRead) { let num_binpacked_blocks: usize = doc_freq / NUM_DOCS_PER_BLOCK; let num_vint_docs = doc_freq & (NUM_DOCS_PER_BLOCK - 1); self.num_binpacked_blocks = num_binpacked_blocks; @@ -398,25 +403,30 @@ impl<'a> BlockSegmentPostings<'a> { pub fn advance(&mut self) -> bool { if self.num_binpacked_blocks > 0 { // TODO could self.doc_offset be just a local variable? - self.remaining_data = - self.doc_decoder - .uncompress_block_sorted(self.remaining_data, self.doc_offset); + + let num_consumed_bytes = self + .doc_decoder + .uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset); + self.remaining_data.advance(num_consumed_bytes); + if self.has_freq { - self.remaining_data = self.freq_decoder.uncompress_block_unsorted(self.remaining_data); + let num_consumed_bytes = self.freq_decoder.uncompress_block_unsorted(self.remaining_data.as_ref()); + self.remaining_data.advance(num_consumed_bytes); } // it will be used as the next offset. self.doc_offset = self.doc_decoder.output(NUM_DOCS_PER_BLOCK - 1); self.num_binpacked_blocks -= 1; true } else if self.num_vint_docs > 0 { - self.remaining_data = + let num_compressed_bytes = self.doc_decoder - .uncompress_vint_sorted(self.remaining_data, + .uncompress_vint_sorted(self.remaining_data.as_ref(), self.doc_offset, self.num_vint_docs); + self.remaining_data.advance(num_compressed_bytes); if self.has_freq { self.freq_decoder - .uncompress_vint_unsorted(self.remaining_data, self.num_vint_docs); + .uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs); } self.num_vint_docs = 0; true @@ -426,7 +436,7 @@ impl<'a> BlockSegmentPostings<'a> { } /// Returns an empty segment postings object - pub fn empty() -> BlockSegmentPostings<'static> { + pub fn empty() -> BlockSegmentPostings { BlockSegmentPostings { num_binpacked_blocks: 0, num_vint_docs: 0, @@ -435,14 +445,14 @@ impl<'a> BlockSegmentPostings<'a> { freq_decoder: BlockDecoder::with_val(1), has_freq: false, - remaining_data: &EMPTY_DATA, + remaining_data: From::from(ReadOnlySource::empty()), doc_offset: 0, doc_freq: 0, } } } -impl<'a, 'b> Streamer<'b> for BlockSegmentPostings<'a> { +impl<'b> Streamer<'b> for BlockSegmentPostings { type Item = &'b [DocId]; fn next(&'b mut self) -> Option<&'b [DocId]> { @@ -498,10 +508,11 @@ mod tests { index.load_searchers().unwrap(); let searcher = index.searcher(); let segment_reader = searcher.segment_reader(0); + let field_reader = segment_reader.field_reader(int_field).unwrap(); let term = Term::from_field_u64(int_field, 0u64); - let term_info = segment_reader.get_term_info(&term).unwrap(); + let term_info = field_reader.get_term_info(&term).unwrap(); let mut block_segments = - segment_reader + field_reader .read_block_postings_from_terminfo(&term_info, SegmentPostingsOption::NoFreq); let mut offset: u32 = 0u32; // checking that the block before calling advance is empty @@ -538,17 +549,19 @@ mod tests { let mut block_segments; { let term = Term::from_field_u64(int_field, 0u64); - let term_info = segment_reader.get_term_info(&term).unwrap(); + let field_reader = segment_reader.field_reader(int_field).unwrap(); + let term_info = field_reader.get_term_info(&term).unwrap(); block_segments = - segment_reader + field_reader .read_block_postings_from_terminfo(&term_info, SegmentPostingsOption::NoFreq); } assert!(block_segments.advance()); assert!(block_segments.docs() == &[0, 2, 4]); { let term = Term::from_field_u64(int_field, 1u64); - let term_info = segment_reader.get_term_info(&term).unwrap(); - segment_reader.reset_block_postings_from_terminfo(&term_info, &mut block_segments); + let field_reader = segment_reader.field_reader(int_field).unwrap(); + let term_info = field_reader.get_term_info(&term).unwrap(); + field_reader.reset_block_postings_from_terminfo(&term_info, &mut block_segments); } assert!(block_segments.advance()); assert!(block_segments.docs() == &[1, 3, 5]); diff --git a/src/query/phrase_query/phrase_scorer.rs b/src/query/phrase_query/phrase_scorer.rs index 1726340d1..d9c887afb 100644 --- a/src/query/phrase_query/phrase_scorer.rs +++ b/src/query/phrase_query/phrase_scorer.rs @@ -5,12 +5,12 @@ use postings::Postings; use postings::IntersectionDocSet; use DocId; -pub struct PhraseScorer<'a> { - pub intersection_docset: IntersectionDocSet>, +pub struct PhraseScorer { + pub intersection_docset: IntersectionDocSet, } -impl<'a> PhraseScorer<'a> { +impl PhraseScorer { fn phrase_match(&self) -> bool { let mut positions_arr: Vec<&[u32]> = self.intersection_docset .docsets() @@ -54,7 +54,7 @@ impl<'a> PhraseScorer<'a> { } } -impl<'a> DocSet for PhraseScorer<'a> { +impl DocSet for PhraseScorer { fn advance(&mut self) -> bool { while self.intersection_docset.advance() { if self.phrase_match() { @@ -74,7 +74,7 @@ impl<'a> DocSet for PhraseScorer<'a> { } -impl<'a> Scorer for PhraseScorer<'a> { +impl Scorer for PhraseScorer { fn score(&self) -> f32 { 1f32 } diff --git a/src/query/phrase_query/phrase_weight.rs b/src/query/phrase_query/phrase_weight.rs index a171b4160..2e9efd463 100644 --- a/src/query/phrase_query/phrase_weight.rs +++ b/src/query/phrase_query/phrase_weight.rs @@ -22,8 +22,9 @@ impl Weight for PhraseWeight { fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result> { let mut term_postings_list = Vec::new(); for term in &self.phrase_terms { + let field_reader = reader.field_reader(term.field())?; let term_postings_option = - reader.read_postings(term, SegmentPostingsOption::FreqAndPositions); + field_reader.read_postings(term, SegmentPostingsOption::FreqAndPositions); if let Some(term_postings) = term_postings_option { term_postings_list.push(term_postings); } else { diff --git a/src/query/term_query/term_weight.rs b/src/query/term_query/term_weight.rs index 99bfa7d47..65f56b054 100644 --- a/src/query/term_query/term_weight.rs +++ b/src/query/term_query/term_weight.rs @@ -27,12 +27,14 @@ impl TermWeight { 1.0 + (self.num_docs as f32 / (self.doc_freq as f32 + 1.0)).ln() } - pub fn specialized_scorer<'a>(&self, - reader: &'a SegmentReader) - -> Result>> { + pub fn specialized_scorer(&self, + reader: &SegmentReader) + -> Result> { let field = self.term.field(); + let field_reader = reader.field_reader(field)?; + // TODO move field reader too let fieldnorm_reader_opt = reader.get_fieldnorms_reader(field); - let postings: Option> = reader.read_postings(&self.term, self.segment_postings_options); + let postings: Option = field_reader.read_postings(&self.term, self.segment_postings_options); Ok(postings .map(|segment_postings| { TermScorer { diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 4689e0673..e7b4b392c 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,11 +1,8 @@ use std::collections::BinaryHeap; -use core::SegmentReader; use termdict::TermStreamerImpl; use common::BinarySerializable; -use postings::TermInfo; use std::cmp::Ordering; use termdict::TermStreamer; -use termdict::TermDictionary; use schema::Term; pub struct HeapItem<'a, V> @@ -58,7 +55,7 @@ pub struct TermMerger<'a, V> impl<'a, V> TermMerger<'a, V> where V: 'a + BinarySerializable + Default { - fn new(streams: Vec>) -> TermMerger<'a, V> { + pub fn new(streams: Vec>) -> TermMerger<'a, V> { TermMerger { heap: BinaryHeap::new(), current_streamers: streams @@ -141,12 +138,3 @@ impl<'a, V> TermMerger<'a, V> } - -impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> { - fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> { - TermMerger::new(segment_readers - .iter() - .map(|reader| reader.terms().stream()) - .collect()) - } -} diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index c4786f539..f045eb1f7 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -282,9 +282,6 @@ mod tests { assert!(!stream.advance()); } - - - #[test] fn test_term_iterator() { let mut schema_builder = SchemaBuilder::default(); @@ -319,13 +316,16 @@ mod tests { } index.load_searchers().unwrap(); let searcher = index.searcher(); - let mut term_it = searcher.terms(); + + let field_searcher = searcher.field(text_field).unwrap(); + let mut term_it = field_searcher.terms(); let mut term_string = String::new(); while term_it.advance() { let term = Term::from_bytes(term_it.key()); term_string.push_str(term.text()); } assert_eq!(&*term_string, "abcdef"); + }