NOBUG test passing

This commit is contained in:
Paul Masurel
2017-08-13 17:57:11 +09:00
parent 0eb3c872fd
commit 413d0e1719
22 changed files with 592 additions and 383 deletions

View File

@@ -52,6 +52,7 @@ impl<W: Write> CompositeWrite<W> {
}
#[derive(Clone)]
pub struct CompositeFile {
data: ReadOnlySource,
offsets_index: HashMap<Field, (usize, usize)>,
@@ -94,6 +95,14 @@ impl CompositeFile {
})
}
pub fn empty() -> CompositeFile {
CompositeFile {
offsets_index: HashMap::new(),
data: ReadOnlySource::empty(),
}
}
pub fn open_read(&self, field: Field) -> Option<ReadOnlySource> {
self.offsets_index
.get(&field)

View File

@@ -46,11 +46,11 @@ pub trait VIntDecoder {
compressed_data: &'a [u8],
offset: u32,
num_els: usize)
-> &'a [u8];
-> usize;
fn uncompress_vint_unsorted<'a>(&mut self,
compressed_data: &'a [u8],
num_els: usize)
-> &'a [u8];
-> usize;
}
impl VIntEncoder for BlockEncoder {
@@ -68,7 +68,7 @@ impl VIntDecoder for BlockDecoder {
compressed_data: &'a [u8],
offset: u32,
num_els: usize)
-> &'a [u8] {
-> usize {
self.output_len = num_els;
vint::uncompress_sorted(compressed_data, &mut self.output[..num_els], offset)
}
@@ -76,7 +76,7 @@ impl VIntDecoder for BlockDecoder {
fn uncompress_vint_unsorted<'a>(&mut self,
compressed_data: &'a [u8],
num_els: usize)
-> &'a [u8] {
-> usize {
self.output_len = num_els;
vint::uncompress_unsorted(compressed_data, &mut self.output[..num_els])
}
@@ -100,8 +100,8 @@ pub mod tests {
let compressed_data = encoder.compress_block_sorted(&vals, 0);
let mut decoder = BlockDecoder::new();
{
let remaining_data = decoder.uncompress_block_sorted(compressed_data, 0);
assert_eq!(remaining_data.len(), 0);
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0);
assert_eq!(consumed_num_bytes, compressed_data.len());
}
for i in 0..128 {
assert_eq!(vals[i], decoder.output(i));
@@ -115,8 +115,8 @@ pub mod tests {
let compressed_data = encoder.compress_block_sorted(&vals, 10);
let mut decoder = BlockDecoder::new();
{
let remaining_data = decoder.uncompress_block_sorted(compressed_data, 10);
assert_eq!(remaining_data.len(), 0);
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10);
assert_eq!(consumed_num_bytes, compressed_data.len());
}
for i in 0..128 {
assert_eq!(vals[i], decoder.output(i));
@@ -134,9 +134,9 @@ pub mod tests {
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
{
let remaining_data = decoder.uncompress_block_sorted(&compressed, 10);
assert_eq!(remaining_data.len(), 1);
assert_eq!(remaining_data[0], 173u8);
let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10);
assert_eq!(consumed_num_bytes, compressed.len() - 1);
assert_eq!(compressed[consumed_num_bytes], 173u8);
}
for i in 0..n {
assert_eq!(vals[i], decoder.output(i));
@@ -154,9 +154,9 @@ pub mod tests {
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
{
let remaining_data = decoder.uncompress_block_unsorted(&compressed);
assert_eq!(remaining_data.len(), 1);
assert_eq!(remaining_data[0], 173u8);
let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed);
assert_eq!(consumed_num_bytes + 1, compressed.len());
assert_eq!(compressed[consumed_num_bytes], 173u8);
}
for i in 0..n {
assert_eq!(vals[i], decoder.output(i));
@@ -174,9 +174,9 @@ pub mod tests {
let encoded_data = encoder.compress_vint_sorted(&input, *offset);
assert!(encoded_data.len() <= expected_length);
let mut decoder = BlockDecoder::new();
let remaining_data =
let consumed_num_bytes =
decoder.uncompress_vint_sorted(&encoded_data, *offset, input.len());
assert_eq!(0, remaining_data.len());
assert_eq!(consumed_num_bytes, encoded_data.len());
assert_eq!(input, decoder.output_array());
}
}

View File

@@ -78,19 +78,19 @@ impl BlockDecoder {
}
}
pub fn uncompress_block_sorted<'a>(&mut self,
compressed_data: &'a [u8],
offset: u32)
-> &'a [u8] {
pub fn uncompress_block_sorted(&mut self,
compressed_data: &[u8],
offset: u32)
-> usize {
let consumed_size = uncompress_sorted(compressed_data, &mut self.output, offset);
self.output_len = NUM_DOCS_PER_BLOCK;
&compressed_data[consumed_size..]
consumed_size
}
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> &'a [u8] {
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
let consumed_size = uncompress_unsorted(compressed_data, &mut self.output);
self.output_len = NUM_DOCS_PER_BLOCK;
&compressed_data[consumed_size..]
consumed_size
}
#[inline]

View File

@@ -1,15 +1,16 @@
use compression::BlockDecoder;
use compression::NUM_DOCS_PER_BLOCK;
use compression::compressed_block_size;
use directory::SourceRead;
pub struct CompressedIntStream<'a> {
buffer: &'a [u8],
pub struct CompressedIntStream {
buffer: SourceRead,
block_decoder: BlockDecoder,
inner_offset: usize,
}
impl<'a> CompressedIntStream<'a> {
pub fn wrap(buffer: &'a [u8]) -> CompressedIntStream<'a> {
impl CompressedIntStream {
pub fn wrap(buffer: SourceRead) -> CompressedIntStream {
CompressedIntStream {
buffer: buffer,
block_decoder: BlockDecoder::new(),
@@ -29,7 +30,8 @@ impl<'a> CompressedIntStream<'a> {
}
num_els -= available;
start += available;
self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer);
let num_consumed_bytes = self.block_decoder.uncompress_block_unsorted(self.buffer.as_ref());
self.buffer.advance(num_consumed_bytes);
self.inner_offset = 0;
}
else {
@@ -51,11 +53,12 @@ impl<'a> CompressedIntStream<'a> {
// entirely skip decompressing some blocks.
while skip_len >= NUM_DOCS_PER_BLOCK {
skip_len -= NUM_DOCS_PER_BLOCK;
let num_bits: u8 = self.buffer[0];
let num_bits: u8 = self.buffer.as_ref()[0];
let block_len = compressed_block_size(num_bits);
self.buffer = &self.buffer[block_len..];
self.buffer.advance(block_len);
}
self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer);
let num_consumed_bytes = self.block_decoder.uncompress_block_unsorted(self.buffer.as_ref());
self.buffer.advance(num_consumed_bytes);
self.inner_offset = skip_len;
}
}
@@ -69,8 +72,9 @@ pub mod tests {
use compression::compressed_block_size;
use compression::NUM_DOCS_PER_BLOCK;
use compression::BlockEncoder;
use directory::{SourceRead, ReadOnlySource};
fn create_stream_buffer() -> Vec<u8> {
fn create_stream_buffer() -> ReadOnlySource {
let mut buffer: Vec<u8> = vec!();
let mut encoder = BlockEncoder::new();
let vals: Vec<u32> = (0u32..1_025u32).collect();
@@ -80,13 +84,14 @@ pub mod tests {
assert_eq!(compressed_block_size(num_bits), compressed_block.len());
buffer.extend_from_slice(compressed_block);
}
buffer
ReadOnlySource::from(buffer)
}
#[test]
fn test_compressed_int_stream() {
let buffer = create_stream_buffer();
let mut stream = CompressedIntStream::wrap(&buffer[..]);
let buffer_reader = SourceRead::from(buffer);
let mut stream = CompressedIntStream::wrap(buffer_reader);
let mut block: [u32; NUM_DOCS_PER_BLOCK] = [0u32; NUM_DOCS_PER_BLOCK];
stream.read(&mut block[0..2]);

View File

@@ -49,20 +49,18 @@ pub fn compress_unsorted<'a>(input: &[u32], output: &'a mut [u8]) -> &'a [u8] {
pub fn uncompress_sorted<'a>(compressed_data: &'a [u8],
output: &mut [u32],
offset: u32)
-> &'a [u8] {
let consumed_bytes = unsafe {
-> usize {
unsafe {
streamvbyte::streamvbyte_delta_decode(compressed_data.as_ptr(),
output.as_mut_ptr(),
output.len() as u32,
offset)
};
&compressed_data[consumed_bytes..]
}
}
#[inline(always)]
pub fn uncompress_unsorted<'a>(compressed_data: &'a [u8], output: &mut [u32]) -> &'a [u8] {
let consumed_bytes = unsafe {
pub fn uncompress_unsorted<'a>(compressed_data: &'a [u8], output: &mut [u32]) -> usize {
unsafe {
streamvbyte::streamvbyte_decode(compressed_data.as_ptr(), output.as_mut_ptr(), output.len())
};
&compressed_data[consumed_bytes..]
}
}

149
src/core/field_reader.rs Normal file
View File

@@ -0,0 +1,149 @@
use directory::{SourceRead, ReadOnlySource};
use termdict::{TermDictionary, TermDictionaryImpl};
use std::io;
use postings::{SegmentPostings, BlockSegmentPostings};
use postings::TermInfo;
use postings::SegmentPostingsOption;
use schema::Term;
use std::cmp;
use fastfield::DeleteBitSet;
use schema::Schema;
use compression::CompressedIntStream;
pub struct FieldReader {
termdict: TermDictionaryImpl,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
delete_bitset: DeleteBitSet,
schema: Schema,
}
impl FieldReader {
pub(crate) fn new(
termdict_source: ReadOnlySource,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
delete_bitset: DeleteBitSet,
schema: Schema,
) -> io::Result<FieldReader> {
Ok(FieldReader {
termdict: TermDictionaryImpl::from_source(termdict_source)?,
postings_source: postings_source,
positions_source: positions_source,
delete_bitset: delete_bitset,
schema: schema,
})
}
/// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
self.termdict.get(term.as_slice())
}
/// Return the term dictionary datastructure.
pub fn terms(&self) -> &TermDictionaryImpl {
&self.termdict
}
/// Resets the block segment to another position of the postings
/// file.
///
/// This is useful for enumerating through a list of terms,
/// and consuming the associated posting lists while avoiding
/// reallocating a `BlockSegmentPostings`.
///
/// # Warning
///
/// This does not reset the positions list.
pub fn reset_block_postings_from_terminfo(&self,
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings) {
let offset = term_info.postings_offset as usize;
let end_source = self.postings_source.len();
let postings_slice = self.postings_source.slice(offset, end_source);
let postings_reader = SourceRead::from(postings_slice);
block_postings.reset(term_info.doc_freq as usize, postings_reader);
}
/// Returns a block postings given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub fn read_block_postings_from_terminfo(&self,
term_info: &TermInfo,
option: SegmentPostingsOption)
-> BlockSegmentPostings {
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_source.slice_from(offset);
let has_freq = option.has_freq();
BlockSegmentPostings::from_data(
term_info.doc_freq as usize,
SourceRead::from(postings_data),
has_freq)
}
/// Returns a posting object given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub fn read_postings_from_terminfo(&self,
term_info: &TermInfo,
option: SegmentPostingsOption)
-> SegmentPostings {
let block_postings = self.read_block_postings_from_terminfo(term_info, option);
let delete_bitset = self.delete_bitset.clone();
let position_stream = {
if option.has_positions() {
let position_offset = term_info.positions_offset;
let positions_reader = SourceRead::from(self.positions_source.slice_from(position_offset as usize));
let mut stream = CompressedIntStream::wrap(positions_reader);
stream.skip(term_info.positions_inner_offset as usize);
Some(stream)
}
else {
None
}
};
SegmentPostings::from_block_postings(
block_postings,
delete_bitset,
position_stream
)
}
/// Returns the segment postings associated with the term, and with the given option,
/// or `None` if the term has never been encountered and indexed.
///
/// If the field was not indexed with the indexing options that cover
/// the requested options, the returned `SegmentPostings` the method does not fail
/// and returns a `SegmentPostings` with as much information as possible.
///
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a
/// `TextIndexingOptions` that does not index position will return a `SegmentPostings`
/// with `DocId`s and frequencies.
pub fn read_postings(&self,
term: &Term,
option: SegmentPostingsOption)
-> Option<SegmentPostings> {
let field = term.field();
let field_entry = self.schema.get_field_entry(field);
let term_info = get!(self.get_term_info(term));
let maximum_option = get!(field_entry.field_type().get_segment_postings_option());
let best_effort_option = cmp::min(maximum_option, option);
Some(self.read_postings_from_terminfo(&term_info, best_effort_option))
}
/// Returns the number of documents containing the term.
pub fn doc_freq(&self, term: &Term) -> u32 {
match self.get_term_info(term) {
Some(term_info) => term_info.doc_freq,
None => 0,
}
}
}

View File

@@ -7,7 +7,9 @@ mod segment;
mod index_meta;
mod pool;
mod segment_meta;
mod field_reader;
pub use self::field_reader::FieldReader;
pub use self::searcher::Searcher;
pub use self::segment_component::SegmentComponent;
pub use self::segment_id::SegmentId;
@@ -18,7 +20,6 @@ pub use self::index::Index;
pub use self::segment_meta::SegmentMeta;
pub use self::index_meta::IndexMeta;
use std::path::PathBuf;
lazy_static! {

View File

@@ -6,9 +6,11 @@ use common::TimerTree;
use query::Query;
use DocId;
use DocAddress;
use schema::Term;
use termdict::TermMerger;
use schema::{Term, Field};
use termdict::{TermMerger, TermDictionary};
use std::sync::Arc;
use std::fmt;
use core::FieldReader;
use postings::TermInfo;
@@ -46,7 +48,12 @@ impl Searcher {
pub fn doc_freq(&self, term: &Term) -> u32 {
self.segment_readers
.iter()
.map(|segment_reader| segment_reader.doc_freq(term))
.map(|segment_reader| {
segment_reader
.field_reader(term.field())
.unwrap() // TODO error handling
.doc_freq(term)
})
.fold(0u32, |acc, val| acc + val)
}
@@ -65,20 +72,46 @@ impl Searcher {
query.search(self, collector)
}
/// Returns a Stream over all of the sorted unique terms of
/// the searcher.
///
/// This includes all of the fields from all of the segment_readers.
/// See [`TermIterator`](struct.TermIterator.html).
///
/// # Warning
/// This API is very likely to change in the future.
pub fn terms(&self) -> TermMerger<TermInfo> {
TermMerger::from(self.segment_readers())
pub fn field(&self, field: Field) -> Result<FieldSearcher> {
let field_readers = self.segment_readers
.iter()
.map(|segment_reader| {
segment_reader.field_reader(field)
})
.collect::<Result<Vec<_>>>()?;
Ok(FieldSearcher::new(field_readers))
}
}
pub struct FieldSearcher {
field_readers: Vec<Arc<FieldReader>>,
}
impl FieldSearcher {
fn new(field_readers: Vec<Arc<FieldReader>>) -> FieldSearcher {
FieldSearcher {
field_readers: field_readers,
}
}
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger<TermInfo> {
let term_streamers: Vec<_> = self.field_readers
.iter()
.map(|field_reader| {
field_reader.terms().stream()
})
.collect();
TermMerger::new(term_streamers)
}
}
impl From<Vec<SegmentReader>> for Searcher {
fn from(segment_readers: Vec<SegmentReader>) -> Searcher {
Searcher { segment_readers: segment_readers }

View File

@@ -2,26 +2,21 @@ use Result;
use core::Segment;
use core::SegmentId;
use core::SegmentComponent;
use schema::Term;
use std::sync::RwLock;
use common::HasLen;
use compression::CompressedIntStream;
use core::SegmentMeta;
use fastfield::{self, FastFieldNotAvailableError};
use fastfield::DeleteBitSet;
use store::StoreReader;
use schema::Document;
use directory::ReadOnlySource;
use DocId;
use std::str;
use termdict::TermDictionary;
use std::cmp;
use postings::TermInfo;
use termdict::TermDictionaryImpl;
use std::sync::Arc;
use std::collections::HashMap;
use common::CompositeFile;
use std::fmt;
use core::FieldReader;
use schema::Field;
use postings::SegmentPostingsOption;
use postings::{SegmentPostings, BlockSegmentPostings};
use fastfield::{FastFieldsReader, FastFieldReader, U64FastFieldReader};
use schema::Schema;
@@ -40,15 +35,19 @@ use schema::Schema;
///
#[derive(Clone)]
pub struct SegmentReader {
field_reader_cache: Arc<RwLock<HashMap<Field, Arc<FieldReader>>>>,
segment_id: SegmentId,
segment_meta: SegmentMeta,
terms: Arc<TermDictionaryImpl>,
postings_data: ReadOnlySource,
termdict_composite: CompositeFile,
postings_composite: CompositeFile,
positions_composite: CompositeFile,
store_reader: StoreReader,
fast_fields_reader: Arc<FastFieldsReader>,
fieldnorms_reader: Arc<FastFieldsReader>,
delete_bitset: DeleteBitSet,
positions_data: ReadOnlySource,
schema: Schema,
}
@@ -117,14 +116,6 @@ impl SegmentReader {
self.fieldnorms_reader.open_reader(field)
}
/// Returns the number of documents containing the term.
pub fn doc_freq(&self, term: &Term) -> u32 {
match self.get_term_info(term) {
Some(term_info) => term_info.doc_freq,
None => 0,
}
}
/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self) -> &StoreReader {
&self.store_reader
@@ -133,13 +124,24 @@ impl SegmentReader {
/// Open a new segment for reading.
pub fn open(segment: Segment) -> Result<SegmentReader> {
let source = segment.open_read(SegmentComponent::TERMS)?;
let terms = TermDictionaryImpl::from_source(source)?;
let termdict_source = segment.open_read(SegmentComponent::TERMS)?;
let termdict_composite = CompositeFile::open(termdict_source)?;
let store_source = segment.open_read(SegmentComponent::STORE)?;
let store_reader = StoreReader::from_source(store_source);
let postings_shared_mmap = segment.open_read(SegmentComponent::POSTINGS)?;
let postings_source = segment.open_read(SegmentComponent::POSTINGS)?;
let postings_composite = CompositeFile::open(postings_source)?;
let positions_composite = {
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONS) {
CompositeFile::open(source)?
}
else {
CompositeFile::empty()
}
};
let fast_field_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
let fast_fields_reader = FastFieldsReader::from_source(fast_field_data)?;
@@ -147,9 +149,6 @@ impl SegmentReader {
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
let fieldnorms_reader = FastFieldsReader::from_source(fieldnorms_data)?;
let positions_data = segment
.open_read(SegmentComponent::POSITIONS)
.unwrap_or_else(|_| ReadOnlySource::empty());
let delete_bitset = if segment.meta().has_deletes() {
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
@@ -160,22 +159,53 @@ impl SegmentReader {
let schema = segment.schema();
Ok(SegmentReader {
segment_meta: segment.meta().clone(),
postings_data: postings_shared_mmap,
terms: Arc::new(terms),
segment_id: segment.id(),
store_reader: store_reader,
fast_fields_reader: Arc::new(fast_fields_reader),
fieldnorms_reader: Arc::new(fieldnorms_reader),
delete_bitset: delete_bitset,
positions_data: positions_data,
schema: schema,
})
field_reader_cache: Arc::new(RwLock::new(HashMap::new())),
segment_meta: segment.meta().clone(),
postings_composite: postings_composite,
termdict_composite: termdict_composite,
segment_id: segment.id(),
store_reader: store_reader,
fast_fields_reader: Arc::new(fast_fields_reader),
fieldnorms_reader: Arc::new(fieldnorms_reader),
delete_bitset: delete_bitset,
positions_composite: positions_composite,
schema: schema,
})
}
/// Return the term dictionary datastructure.
pub fn terms(&self) -> &TermDictionaryImpl {
&self.terms
pub fn field_reader(&self, field: Field) -> Result<Arc<FieldReader>> {
if let Some(field_reader) = self.field_reader_cache.read()
.unwrap() // TODO
.get(&field) {
return Ok(field_reader.clone());
}
// TODO better error
let termdict_source = self.termdict_composite
.open_read(field)
.ok_or("Field not found")?;
let postings_source = self.postings_composite
.open_read(field)
.ok_or("field not found")?;
let positions_source = self.positions_composite
.open_read(field)
.ok_or("field not found")?;
let field_reader = Arc::new(FieldReader::new(
termdict_source,
postings_source,
positions_source,
self.delete_bitset.clone(),
self.schema.clone(),
)?);
self.field_reader_cache
.write()
.unwrap() // TODO
.insert(field, field_reader.clone());
Ok(field_reader)
}
/// Returns the document (or to be accurate, its stored field)
@@ -187,100 +217,6 @@ impl SegmentReader {
}
/// Returns the segment postings associated with the term, and with the given option,
/// or `None` if the term has never been encountered and indexed.
///
/// If the field was not indexed with the indexing options that cover
/// the requested options, the returned `SegmentPostings` the method does not fail
/// and returns a `SegmentPostings` with as much information as possible.
///
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a
/// `TextIndexingOptions` that does not index position will return a `SegmentPostings`
/// with `DocId`s and frequencies.
pub fn read_postings(&self,
term: &Term,
option: SegmentPostingsOption)
-> Option<SegmentPostings> {
let field = term.field();
let field_entry = self.schema.get_field_entry(field);
let term_info = get!(self.get_term_info(term));
let maximum_option = get!(field_entry.field_type().get_segment_postings_option());
let best_effort_option = cmp::min(maximum_option, option);
Some(self.read_postings_from_terminfo(&term_info, best_effort_option))
}
/// Returns a posting object given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub fn read_postings_from_terminfo(&self,
term_info: &TermInfo,
option: SegmentPostingsOption)
-> SegmentPostings {
let block_postings = self.read_block_postings_from_terminfo(term_info, option);
let delete_bitset = self.delete_bitset.clone();
let position_stream = {
if option.has_positions() {
let position_offset = term_info.positions_offset;
let positions_data = &self.positions_data[position_offset as usize..];
let mut stream = CompressedIntStream::wrap(positions_data);
stream.skip(term_info.positions_inner_offset as usize);
Some(stream)
}
else {
None
}
};
SegmentPostings::from_block_postings(
block_postings,
delete_bitset,
position_stream
)
}
/// Returns a block postings given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub fn read_block_postings_from_terminfo(&self,
term_info: &TermInfo,
option: SegmentPostingsOption)
-> BlockSegmentPostings {
let offset = term_info.postings_offset as usize;
let postings_data = &self.postings_data[offset..];
let has_freq = option.has_freq();
BlockSegmentPostings::from_data(
term_info.doc_freq as usize,
postings_data,
has_freq)
}
/// Resets the block segment to another position of the postings
/// file.
///
/// This is useful for enumerating through a list of terms,
/// and consuming the associated posting lists while avoiding
/// reallocating a `BlockSegmentPostings`.
///
/// # Warning
///
/// This does not reset the positions list.
pub fn reset_block_postings_from_terminfo<'a>(&'a self,
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings<'a>) {
let offset = term_info.postings_offset as usize;
let postings_data: &'a [u8] = &self.postings_data[offset..];
block_postings.reset(term_info.doc_freq as usize, postings_data);
}
/// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
self.terms.get(term.as_slice())
}
/// Returns the segment id
pub fn segment_id(&self) -> SegmentId {
self.segment_id

View File

@@ -16,7 +16,7 @@ pub mod error;
use std::io::{Write, Seek};
use std::io::BufWriter;
pub use self::read_only_source::ReadOnlySource;
pub use self::read_only_source::{SourceRead, ReadOnlySource};
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;

View File

@@ -2,6 +2,8 @@ use fst::raw::MmapReadOnly;
use std::ops::Deref;
use super::shared_vec_slice::SharedVecSlice;
use common::HasLen;
use std::slice;
use std::io::{self, Read};
use stable_deref_trait::StableDeref;
/// Read object that represents files in tantivy.
@@ -62,6 +64,11 @@ impl ReadOnlySource {
}
}
}
pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource {
let len = self.len();
self.slice(from_offset, len)
}
}
impl HasLen for ReadOnlySource {
@@ -82,3 +89,38 @@ impl From<Vec<u8>> for ReadOnlySource {
ReadOnlySource::Anonymous(shared_data)
}
}
pub struct SourceRead {
_data_owner: ReadOnlySource,
cursor: &'static [u8]
}
impl SourceRead {
pub fn advance(&mut self, len: usize) {
self.cursor = &self.cursor[len..];
}
}
impl AsRef<[u8]> for SourceRead {
fn as_ref(&self) -> &[u8] {
self.cursor
}
}
impl From<ReadOnlySource> for SourceRead {
fn from(source: ReadOnlySource) -> SourceRead {
let len = source.len();
let slice_ptr = source.as_slice().as_ptr();
let static_slice = unsafe { slice::from_raw_parts(slice_ptr, len) };
SourceRead {
_data_owner: source,
cursor: static_slice,
}
}
}
impl Read for SourceRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.cursor.read(buf)
}
}

View File

@@ -177,8 +177,9 @@ pub fn compute_deleted_bitset(delete_bitset: &mut BitSet,
// Limit doc helps identify the first document
// that may be affected by the delete operation.
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
let field_reader = segment_reader.field_reader(delete_op.term.field())?;
if let Some(mut docset) =
segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
field_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) {
while docset.advance() {
let deleted_doc = docset.doc();
if deleted_doc < limit_doc {

View File

@@ -17,6 +17,7 @@ use fastfield::FastFieldSerializer;
use fastfield::FastFieldReader;
use store::StoreWriter;
use std::cmp::{min, max};
use termdict::TermDictionary;
use schema::Term;
use termdict::TermStreamer;
@@ -195,48 +196,62 @@ impl IndexMerger {
fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
let mut delta_computer = DeltaComputer::new();
let mut merged_terms = TermMerger::from(&self.readers[..]);
let mut max_doc = 0;
// map from segment doc ids to the resulting merged segment doc id.
let mut merged_doc_id_map: Vec<Vec<Option<DocId>>> = Vec::with_capacity(self.readers.len());
for reader in &self.readers {
let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize);
for doc_id in 0..reader.max_doc() {
if reader.is_deleted(doc_id) {
segment_local_map.push(None);
} else {
segment_local_map.push(Some(max_doc));
max_doc += 1u32;
}
let mut indexed_fields = vec!();
for (field_ord, field_entry) in self.schema.fields().iter().enumerate() {
// if field_entry
if field_entry.is_indexed() {
indexed_fields.push(Field(field_ord as u32));
}
merged_doc_id_map.push(segment_local_map);
}
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
//
// In the new segments, the doc id from the different
// segment are stacked so that :
// - Segment 0's doc ids become doc id [0, seg.max_doc]
// - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc]
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc,
// seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
if !merged_terms.advance() {
return Ok(());
}
for indexed_field in indexed_fields {
let mut current_field = Term::wrap(merged_terms.key()).field();
let field_readers = self.readers
.iter()
.map(|reader|
reader.field_reader(indexed_field))
.collect::<Result<Vec<_>>>()?;
loop {
// this loop processes all fields.
let mut field_serializer = serializer.new_field(current_field)?;
let field_term_streams = field_readers
.iter()
.map(|field_reader| field_reader.terms().stream() )
.collect();
let mut merged_terms = TermMerger::new(field_term_streams);
let mut max_doc = 0;
// map from segment doc ids to the resulting merged segment doc id.
let mut merged_doc_id_map: Vec<Vec<Option<DocId>>> = Vec::with_capacity(self.readers.len());
for reader in &self.readers {
let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize);
for doc_id in 0..reader.max_doc() {
if reader.is_deleted(doc_id) {
segment_local_map.push(None);
} else {
segment_local_map.push(Some(max_doc));
max_doc += 1u32;
}
}
merged_doc_id_map.push(segment_local_map);
}
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
//
// In the new segments, the doc id from the different
// segment are stacked so that :
// - Segment 0's doc ids become doc id [0, seg.max_doc]
// - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc]
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc,
// seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
let mut field_serializer = serializer.new_field(indexed_field)?;
let field_entry = self.schema.get_field_entry(indexed_field);
// we reached a new field.
let field_entry = self.schema.get_field_entry(current_field);
// ... set segment postings option the new field.
let segment_postings_option = field_entry
.field_type()
@@ -244,88 +259,78 @@ impl IndexMerger {
.expect("Encountered a field that is not supposed to be
indexed. Have you modified the schema?");
loop {
// this loops processes a field.
{
let term = Term::wrap(merged_terms.key());
while merged_terms.advance() {
// Let's compute the list of non-empty posting lists
let segment_postings: Vec<_> = merged_terms
.current_kvs()
.iter()
.flat_map(|heap_item| {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
let mut segment_postings =
segment_reader
.read_postings_from_terminfo(term_info, segment_postings_option);
if segment_postings.advance() {
Some((segment_ord, segment_postings))
} else {
None
let term = Term::wrap(merged_terms.key());
// Let's compute the list of non-empty posting lists
let segment_postings: Vec<_> = merged_terms
.current_kvs()
.iter()
.flat_map(|heap_item| {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
let field_reader = segment_reader.field_reader(term.field()).unwrap(); // TODO fix unwrap
let mut segment_postings = field_reader
.read_postings_from_terminfo(term_info, segment_postings_option);
if segment_postings.advance() {
Some((segment_ord, segment_postings))
} else {
None
}
})
.collect();
// At this point, `segment_postings` contains the posting list
// of all of the segments containing the given term.
//
// These segments are non-empty and advance has already been called.
if !segment_postings.is_empty() {
// If not, the `term` will be entirely removed.
// We know that there is at least one document containing
// the term, so we add it.
field_serializer.new_term(term.as_ref())?;
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
loop {
// `.advance()` has been called once before the loop.
// Hence we cannot use a `while segment_postings.advance()` loop.
if let Some(remapped_doc_id) =
old_to_new_doc_id[segment_postings.doc() as usize] {
// we make sure to only write the term iff
// there is at least one document.
let positions: &[u32] = segment_postings.positions();
let term_freq = segment_postings.term_freq();
let delta_positions = delta_computer.compute_delta(positions);
field_serializer
.write_doc(remapped_doc_id, term_freq, delta_positions)?;
}
})
.collect();
// At this point, `segment_postings` contains the posting list
// of all of the segments containing the given term.
//
// These segments are non-empty and advance has already been called.
if !segment_postings.is_empty() {
// If not, the `term` will be entirely removed.
// We know that there is at least one document containing
// the term, so we add it.
field_serializer.new_term(term.as_ref())?;
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
loop {
// `.advance()` has been called once before the loop.
// Hence we cannot use a `while segment_postings.advance()` loop.
if let Some(remapped_doc_id) =
old_to_new_doc_id[segment_postings.doc() as usize] {
// we make sure to only write the term iff
// there is at least one document.
let positions: &[u32] = segment_postings.positions();
let term_freq = segment_postings.term_freq();
let delta_positions = delta_computer.compute_delta(positions);
field_serializer
.write_doc(remapped_doc_id, term_freq, delta_positions)?;
}
if !segment_postings.advance() {
break;
}
if !segment_postings.advance() {
break;
}
}
// closing the term.
field_serializer.close_term()?;
}
// closing the term.
field_serializer.close_term()?;
}
if !merged_terms.advance() {
field_serializer.close()?;
return Ok(())
}
{
let next_term_field = Term::wrap(merged_terms.key()).field();
if next_term_field != current_field {
current_field = next_term_field;
field_serializer.close()?;
break;
}
}
}
field_serializer.close()?;
}
/*
*/
Ok(())
}
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {

View File

@@ -390,15 +390,16 @@ mod tests {
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none());
let field_reader = reader.field_reader(text_field).unwrap();
assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none());
{
let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 5);
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
@@ -424,16 +425,17 @@ mod tests {
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
let field_reader = reader.field_reader(term_abcd.field()).unwrap();
assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none());
assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none());
{
let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 5);
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
@@ -459,13 +461,14 @@ mod tests {
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none());
let field_reader = reader.field_reader(term_abcd.field()).unwrap();
assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none());
{
let mut postings = reader.read_postings(&term_a, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_a, FreqAndPositions).unwrap();
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings(&term_b, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_b, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
@@ -473,7 +476,7 @@ mod tests {
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings(&term_c, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_c, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 4);
assert!(!postings.advance());
@@ -497,6 +500,7 @@ mod tests {
let term = Term::from_field_u64(field, 1u64);
let mut postings = searcher
.segment_reader(0)
.field_reader(term.field()).unwrap()
.read_postings(&term, SegmentPostingsOption::NoFreq)
.unwrap();
assert!(postings.advance());
@@ -520,6 +524,7 @@ mod tests {
let term = Term::from_field_i64(value_field, negative_val);
let mut postings = searcher
.segment_reader(0)
.field_reader(term.field()).unwrap()
.read_postings(&term, SegmentPostingsOption::NoFreq)
.unwrap();
assert!(postings.advance());
@@ -582,10 +587,11 @@ mod tests {
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
let field_reader = reader.field_reader(text_field).unwrap();
let term_abcd = Term::from_field_text(text_field, "abcd");
assert!(reader.read_postings(&term_abcd, FreqAndPositions).is_none());
assert!(field_reader.read_postings(&term_abcd, FreqAndPositions).is_none());
let term_af = Term::from_field_text(text_field, "af");
let mut postings = reader.read_postings(&term_af, FreqAndPositions).unwrap();
let mut postings = field_reader.read_postings(&term_af, FreqAndPositions).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 0);
assert_eq!(postings.term_freq(), 3);

View File

@@ -132,12 +132,14 @@ mod tests {
{
let term_a = Term::from_field_text(text_field, "abcdef");
assert!(segment_reader
.read_postings(&term_a, FreqAndPositions)
.is_none());
.field_reader(term_a.field()).unwrap()
.read_postings(&term_a, FreqAndPositions)
.is_none());
}
{
let term_a = Term::from_field_text(text_field, "a");
let mut postings_a = segment_reader
.field_reader(term_a.field()).unwrap()
.read_postings(&term_a, FreqAndPositions)
.unwrap();
assert_eq!(postings_a.len(), 1000);
@@ -160,6 +162,7 @@ mod tests {
{
let term_e = Term::from_field_text(text_field, "e");
let mut postings_e = segment_reader
.field_reader(term_e.field()).unwrap()
.read_postings(&term_e, FreqAndPositions)
.unwrap();
assert_eq!(postings_e.len(), 1000 - 2);
@@ -247,6 +250,7 @@ mod tests {
for i in 0..num_docs - 1 {
for j in i + 1..num_docs {
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -260,6 +264,7 @@ mod tests {
{
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -280,6 +285,7 @@ mod tests {
// check that filtering works
{
let mut segment_postings = segment_reader
.field_reader(term_0.field()).unwrap()
.read_postings(&term_0, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -289,6 +295,7 @@ mod tests {
}
let mut segment_postings = segment_reader
.field_reader(term_0.field()).unwrap()
.read_postings(&term_0, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -313,6 +320,7 @@ mod tests {
// make sure seeking still works
for i in 0..num_docs {
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -328,6 +336,7 @@ mod tests {
// now try with a longer sequence
{
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -363,12 +372,14 @@ mod tests {
// finally, check that it's empty
{
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
assert_eq!(segment_postings.skip_next(0), SkipResult::End);
let mut segment_postings = segment_reader
.field_reader(term_2.field()).unwrap()
.read_postings(&term_2, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -436,6 +447,7 @@ mod tests {
b.iter(|| {
let mut segment_postings = segment_reader
.field_reader(TERM_A.field()).unwrap()
.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq)
.unwrap();
while segment_postings.advance() {}
@@ -448,15 +460,19 @@ mod tests {
let segment_reader = searcher.segment_reader(0);
b.iter(|| {
let segment_postings_a = segment_reader
.field_reader(TERM_A.field()).unwrap()
.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq)
.unwrap();
let segment_postings_b = segment_reader
.field_reader(TERM_B.field()).unwrap()
.read_postings(&*TERM_B, SegmentPostingsOption::NoFreq)
.unwrap();
let segment_postings_c = segment_reader
.field_reader(TERM_C.field()).unwrap()
.read_postings(&*TERM_C, SegmentPostingsOption::NoFreq)
.unwrap();
let segment_postings_d = segment_reader
.field_reader(TERM_D.field()).unwrap()
.read_postings(&*TERM_D, SegmentPostingsOption::NoFreq)
.unwrap();
let mut intersection = IntersectionDocSet::from(vec![segment_postings_a,
@@ -473,6 +489,7 @@ mod tests {
let docs = tests::sample(segment_reader.num_docs(), p);
let mut segment_postings = segment_reader
.field_reader(TERM_A.field()).unwrap()
.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq)
.unwrap();
@@ -489,6 +506,7 @@ mod tests {
b.iter(|| {
let mut segment_postings = segment_reader
.field_reader(TERM_A.field()).unwrap()
.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq)
.unwrap();
for doc in &existing_docs {
@@ -526,6 +544,7 @@ mod tests {
b.iter(|| {
let n: u32 = test::black_box(17);
let mut segment_postings = segment_reader
.field_reader(TERM_A.field()).unwrap()
.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq)
.unwrap();
let mut s = 0u32;

View File

@@ -101,8 +101,9 @@ impl<'a> MultiFieldPostingsWriter<'a> {
let (field, start) = offsets[i];
let (_, stop) = offsets[i + 1];
let postings_writer = &self.per_field_postings_writers[field.0 as usize];
let field_serializer = serializer.new_field(field)?;
postings_writer.serialize(&term_offsets[start..stop], field_serializer, self.heap)?;
let mut field_serializer = serializer.new_field(field)?;
postings_writer.serialize(&term_offsets[start..stop], &mut field_serializer, self.heap)?;
field_serializer.close()?;
}
Ok(())
}
@@ -137,7 +138,7 @@ pub trait PostingsWriter {
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(&self,
term_addrs: &[(&[u8], u32)],
serializer: FieldSerializer,
serializer: &mut FieldSerializer,
heap: &Heap)
-> io::Result<()>;
@@ -214,13 +215,13 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'
fn serialize(&self,
term_addrs: &[(&[u8], u32)],
mut serializer: FieldSerializer,
serializer: &mut FieldSerializer,
heap: &Heap)
-> io::Result<()> {
for &(term_bytes, addr) in term_addrs {
let recorder: &mut Rec = self.heap.get_mut_ref(addr);
serializer.new_term(term_bytes)?;
recorder.serialize(addr, &mut serializer, heap)?;
recorder.serialize(addr, serializer, heap)?;
serializer.close_term()?;
}
Ok(())

View File

@@ -5,11 +5,15 @@ use std::cmp;
use fst::Streamer;
use fastfield::DeleteBitSet;
use std::cell::UnsafeCell;
use directory::{SourceRead, ReadOnlySource};
const EMPTY_DATA: [u8; 0] = [0u8; 0];
const EMPTY_POSITIONS: [u32; 0] = [0u32; 0];
struct PositionComputer<'a> {
struct PositionComputer {
// store the amount of position int
// before reading positions.
//
@@ -17,12 +21,12 @@ struct PositionComputer<'a> {
// the positions vec.
position_to_skip: Option<usize>,
positions: Vec<u32>,
positions_stream: CompressedIntStream<'a>,
positions_stream: CompressedIntStream,
}
impl<'a> PositionComputer<'a> {
impl PositionComputer {
pub fn new(positions_stream: CompressedIntStream<'a>) -> PositionComputer<'a> {
pub fn new(positions_stream: CompressedIntStream) -> PositionComputer {
PositionComputer {
position_to_skip: None,
positions: vec!(),
@@ -64,25 +68,25 @@ impl<'a> PositionComputer<'a> {
///
/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded.
/// Positions on the other hand, are optionally entirely decoded upfront.
pub struct SegmentPostings<'a> {
block_cursor: BlockSegmentPostings<'a>,
pub struct SegmentPostings {
block_cursor: BlockSegmentPostings,
cur: usize,
delete_bitset: DeleteBitSet,
position_computer: Option<UnsafeCell<PositionComputer<'a>>>,
position_computer: Option<UnsafeCell<PositionComputer>>,
}
impl<'a> SegmentPostings<'a> {
impl SegmentPostings {
/// Reads a Segment postings from an &[u8]
///
/// * `len` - number of document in the posting lists.
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding
/// frequencies and/or positions
pub fn from_block_postings(segment_block_postings: BlockSegmentPostings<'a>,
pub fn from_block_postings(segment_block_postings: BlockSegmentPostings,
delete_bitset: DeleteBitSet,
positions_stream_opt: Option<CompressedIntStream<'a>>)
-> SegmentPostings<'a> {
positions_stream_opt: Option<CompressedIntStream>)
-> SegmentPostings {
let position_computer = positions_stream_opt.map(|stream| {
UnsafeCell::new(PositionComputer::new(stream))
});
@@ -95,7 +99,7 @@ impl<'a> SegmentPostings<'a> {
}
/// Returns an empty segment postings object
pub fn empty() -> SegmentPostings<'a> {
pub fn empty() -> SegmentPostings {
let empty_block_cursor = BlockSegmentPostings::empty();
SegmentPostings {
block_cursor: empty_block_cursor,
@@ -117,7 +121,7 @@ impl<'a> SegmentPostings<'a> {
}
impl<'a> DocSet for SegmentPostings<'a> {
impl DocSet for SegmentPostings {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
#[inline]
@@ -259,13 +263,13 @@ impl<'a> DocSet for SegmentPostings<'a> {
}
}
impl<'a> HasLen for SegmentPostings<'a> {
impl HasLen for SegmentPostings {
fn len(&self) -> usize {
self.block_cursor.doc_freq()
}
}
impl<'a> Postings for SegmentPostings<'a> {
impl Postings for SegmentPostings {
fn term_freq(&self) -> u32 {
self.block_cursor.freq(self.cur)
}
@@ -286,6 +290,7 @@ impl<'a> Postings for SegmentPostings<'a> {
}
/// `BlockSegmentPostings` is a cursor iterating over blocks
/// of documents.
///
@@ -293,7 +298,7 @@ impl<'a> Postings for SegmentPostings<'a> {
///
/// While it is useful for some very specific high-performance
/// use cases, you should prefer using `SegmentPostings` for most usage.
pub struct BlockSegmentPostings<'a> {
pub struct BlockSegmentPostings {
doc_decoder: BlockDecoder,
freq_decoder: BlockDecoder,
has_freq: bool,
@@ -302,14 +307,14 @@ pub struct BlockSegmentPostings<'a> {
doc_offset: DocId,
num_binpacked_blocks: usize,
num_vint_docs: usize,
remaining_data: &'a [u8],
remaining_data: SourceRead,
}
impl<'a> BlockSegmentPostings<'a> {
impl BlockSegmentPostings {
pub(crate) fn from_data(doc_freq: usize,
data: &'a [u8],
data: SourceRead,
has_freq: bool)
-> BlockSegmentPostings<'a> {
-> BlockSegmentPostings {
let num_binpacked_blocks: usize = (doc_freq as usize) / NUM_DOCS_PER_BLOCK;
let num_vint_docs = (doc_freq as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks;
BlockSegmentPostings {
@@ -337,7 +342,7 @@ impl<'a> BlockSegmentPostings<'a> {
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: &'a [u8]) {
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: SourceRead) {
let num_binpacked_blocks: usize = doc_freq / NUM_DOCS_PER_BLOCK;
let num_vint_docs = doc_freq & (NUM_DOCS_PER_BLOCK - 1);
self.num_binpacked_blocks = num_binpacked_blocks;
@@ -398,25 +403,30 @@ impl<'a> BlockSegmentPostings<'a> {
pub fn advance(&mut self) -> bool {
if self.num_binpacked_blocks > 0 {
// TODO could self.doc_offset be just a local variable?
self.remaining_data =
self.doc_decoder
.uncompress_block_sorted(self.remaining_data, self.doc_offset);
let num_consumed_bytes = self
.doc_decoder
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
self.remaining_data.advance(num_consumed_bytes);
if self.has_freq {
self.remaining_data = self.freq_decoder.uncompress_block_unsorted(self.remaining_data);
let num_consumed_bytes = self.freq_decoder.uncompress_block_unsorted(self.remaining_data.as_ref());
self.remaining_data.advance(num_consumed_bytes);
}
// it will be used as the next offset.
self.doc_offset = self.doc_decoder.output(NUM_DOCS_PER_BLOCK - 1);
self.num_binpacked_blocks -= 1;
true
} else if self.num_vint_docs > 0 {
self.remaining_data =
let num_compressed_bytes =
self.doc_decoder
.uncompress_vint_sorted(self.remaining_data,
.uncompress_vint_sorted(self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs);
self.remaining_data.advance(num_compressed_bytes);
if self.has_freq {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data, self.num_vint_docs);
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
self.num_vint_docs = 0;
true
@@ -426,7 +436,7 @@ impl<'a> BlockSegmentPostings<'a> {
}
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings<'static> {
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
num_binpacked_blocks: 0,
num_vint_docs: 0,
@@ -435,14 +445,14 @@ impl<'a> BlockSegmentPostings<'a> {
freq_decoder: BlockDecoder::with_val(1),
has_freq: false,
remaining_data: &EMPTY_DATA,
remaining_data: From::from(ReadOnlySource::empty()),
doc_offset: 0,
doc_freq: 0,
}
}
}
impl<'a, 'b> Streamer<'b> for BlockSegmentPostings<'a> {
impl<'b> Streamer<'b> for BlockSegmentPostings {
type Item = &'b [DocId];
fn next(&'b mut self) -> Option<&'b [DocId]> {
@@ -498,10 +508,11 @@ mod tests {
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_reader = searcher.segment_reader(0);
let field_reader = segment_reader.field_reader(int_field).unwrap();
let term = Term::from_field_u64(int_field, 0u64);
let term_info = segment_reader.get_term_info(&term).unwrap();
let term_info = field_reader.get_term_info(&term).unwrap();
let mut block_segments =
segment_reader
field_reader
.read_block_postings_from_terminfo(&term_info, SegmentPostingsOption::NoFreq);
let mut offset: u32 = 0u32;
// checking that the block before calling advance is empty
@@ -538,17 +549,19 @@ mod tests {
let mut block_segments;
{
let term = Term::from_field_u64(int_field, 0u64);
let term_info = segment_reader.get_term_info(&term).unwrap();
let field_reader = segment_reader.field_reader(int_field).unwrap();
let term_info = field_reader.get_term_info(&term).unwrap();
block_segments =
segment_reader
field_reader
.read_block_postings_from_terminfo(&term_info, SegmentPostingsOption::NoFreq);
}
assert!(block_segments.advance());
assert!(block_segments.docs() == &[0, 2, 4]);
{
let term = Term::from_field_u64(int_field, 1u64);
let term_info = segment_reader.get_term_info(&term).unwrap();
segment_reader.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
let field_reader = segment_reader.field_reader(int_field).unwrap();
let term_info = field_reader.get_term_info(&term).unwrap();
field_reader.reset_block_postings_from_terminfo(&term_info, &mut block_segments);
}
assert!(block_segments.advance());
assert!(block_segments.docs() == &[1, 3, 5]);

View File

@@ -5,12 +5,12 @@ use postings::Postings;
use postings::IntersectionDocSet;
use DocId;
pub struct PhraseScorer<'a> {
pub intersection_docset: IntersectionDocSet<SegmentPostings<'a>>,
pub struct PhraseScorer {
pub intersection_docset: IntersectionDocSet<SegmentPostings>,
}
impl<'a> PhraseScorer<'a> {
impl PhraseScorer {
fn phrase_match(&self) -> bool {
let mut positions_arr: Vec<&[u32]> = self.intersection_docset
.docsets()
@@ -54,7 +54,7 @@ impl<'a> PhraseScorer<'a> {
}
}
impl<'a> DocSet for PhraseScorer<'a> {
impl DocSet for PhraseScorer {
fn advance(&mut self) -> bool {
while self.intersection_docset.advance() {
if self.phrase_match() {
@@ -74,7 +74,7 @@ impl<'a> DocSet for PhraseScorer<'a> {
}
impl<'a> Scorer for PhraseScorer<'a> {
impl Scorer for PhraseScorer {
fn score(&self) -> f32 {
1f32
}

View File

@@ -22,8 +22,9 @@ impl Weight for PhraseWeight {
fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result<Box<Scorer + 'a>> {
let mut term_postings_list = Vec::new();
for term in &self.phrase_terms {
let field_reader = reader.field_reader(term.field())?;
let term_postings_option =
reader.read_postings(term, SegmentPostingsOption::FreqAndPositions);
field_reader.read_postings(term, SegmentPostingsOption::FreqAndPositions);
if let Some(term_postings) = term_postings_option {
term_postings_list.push(term_postings);
} else {

View File

@@ -27,12 +27,14 @@ impl TermWeight {
1.0 + (self.num_docs as f32 / (self.doc_freq as f32 + 1.0)).ln()
}
pub fn specialized_scorer<'a>(&self,
reader: &'a SegmentReader)
-> Result<TermScorer<SegmentPostings<'a>>> {
pub fn specialized_scorer(&self,
reader: &SegmentReader)
-> Result<TermScorer<SegmentPostings>> {
let field = self.term.field();
let field_reader = reader.field_reader(field)?;
// TODO move field reader too
let fieldnorm_reader_opt = reader.get_fieldnorms_reader(field);
let postings: Option<SegmentPostings<'a>> = reader.read_postings(&self.term, self.segment_postings_options);
let postings: Option<SegmentPostings> = field_reader.read_postings(&self.term, self.segment_postings_options);
Ok(postings
.map(|segment_postings| {
TermScorer {

View File

@@ -1,11 +1,8 @@
use std::collections::BinaryHeap;
use core::SegmentReader;
use termdict::TermStreamerImpl;
use common::BinarySerializable;
use postings::TermInfo;
use std::cmp::Ordering;
use termdict::TermStreamer;
use termdict::TermDictionary;
use schema::Term;
pub struct HeapItem<'a, V>
@@ -58,7 +55,7 @@ pub struct TermMerger<'a, V>
impl<'a, V> TermMerger<'a, V>
where V: 'a + BinarySerializable + Default
{
fn new(streams: Vec<TermStreamerImpl<'a, V>>) -> TermMerger<'a, V> {
pub fn new(streams: Vec<TermStreamerImpl<'a, V>>) -> TermMerger<'a, V> {
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
@@ -141,12 +138,3 @@ impl<'a, V> TermMerger<'a, V>
}
impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo> {
fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> {
TermMerger::new(segment_readers
.iter()
.map(|reader| reader.terms().stream())
.collect())
}
}

View File

@@ -282,9 +282,6 @@ mod tests {
assert!(!stream.advance());
}
#[test]
fn test_term_iterator() {
let mut schema_builder = SchemaBuilder::default();
@@ -319,13 +316,16 @@ mod tests {
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let mut term_it = searcher.terms();
let field_searcher = searcher.field(text_field).unwrap();
let mut term_it = field_searcher.terms();
let mut term_string = String::new();
while term_it.advance() {
let term = Term::from_bytes(term_it.key());
term_string.push_str(term.text());
}
assert_eq!(&*term_string, "abcdef");
}