Merge pull request #68 from tantivy-search/issue/67

Issue/67
This commit is contained in:
Paul Masurel
2016-12-20 11:27:05 +01:00
committed by GitHub
9 changed files with 307 additions and 190 deletions

View File

@@ -7,7 +7,7 @@ mod segment_component;
mod segment;
mod index_meta;
mod pool;
mod term_iterator;
use std::path::PathBuf;
@@ -19,7 +19,7 @@ pub use self::segment::SegmentInfo;
pub use self::segment::SerializableSegment;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta};
pub use self::term_iterator::TermIterator;
lazy_static! {
pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");

View File

@@ -7,6 +7,7 @@ use query::Query;
use DocId;
use DocAddress;
use schema::Term;
use core::TermIterator;
/// Holds a list of `SegmentReader`s ready for search.
@@ -47,9 +48,21 @@ impl Searcher {
.map(|segment_reader| segment_reader.doc_freq(term))
.fold(0u32, |acc, val| acc + val)
}
/// Returns a Stream over all of the sorted unique terms of
/// the searcher.
///
/// This includes all of the fields from all of the segment_readers.
/// See [TermIterator](struct.TermIterator.html).
///
/// # Warning
/// This API is very likely to change in the future.
pub fn terms<'a>(&'a self) -> TermIterator<'a> {
TermIterator::from(self.segment_readers())
}
/// Return the list of segment readers
pub fn segment_readers(&self,) -> &Vec<SegmentReader> {
pub fn segment_readers(&self,) -> &[SegmentReader] {
&self.segment_readers
}

184
src/core/term_iterator.rs Normal file
View File

@@ -0,0 +1,184 @@
use fst::Streamer;
use std::mem;
use std::collections::BinaryHeap;
use fst::map::Keys;
use schema::Field;
use schema::Term;
use core::SegmentReader;
use std::cmp::Ordering;
#[derive(PartialEq, Eq, Debug)]
struct HeapItem {
term: Term,
segment_ord: usize,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &HeapItem) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &HeapItem) -> Ordering {
(&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermIterator<'a> {
key_streams: Vec<Keys<'a>>,
heap: BinaryHeap<HeapItem>,
// Buffer hosting the list of segment ordinals containing
// the current term.
current_term: Term,
current_segment_ords: Vec<usize>,
}
impl<'a> TermIterator<'a> {
fn new(key_streams: Vec<Keys<'a>>) -> TermIterator<'a> {
let key_streams_len = key_streams.len();
TermIterator {
key_streams: key_streams,
heap: BinaryHeap::new(),
current_term: Term::from_field_text(Field(0), ""),
current_segment_ords: (0..key_streams_len).collect(),
}
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(mut head) = self.heap.pop() {
mem::swap(&mut self.current_term, &mut head.term);
self.current_segment_ords.push(head.segment_ord);
loop {
match self.heap.peek() {
Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {}
_ => { break; }
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_segment_ords.push(next_heap_it.segment_ord);
}
true
}
else {
false
}
}
/// Returns the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn term(&self) -> &Term {
&self.current_term
}
/// Returns the sorted list of segment ordinals
/// that include the current term.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn segment_ords(&self) -> &[usize]{
&self.current_segment_ords[..]
}
fn advance_segments(&mut self) {
for segment_ord in self.current_segment_ords.drain(..) {
if let Some(term) = self.key_streams[segment_ord].next() {
self.heap.push(HeapItem {
term: Term::from(term),
segment_ord: segment_ord,
});
}
}
}
}
impl<'a, 'f> Streamer<'a> for TermIterator<'f> {
type Item = &'a Term;
fn next(&'a mut self) -> Option<Self::Item> {
if self.advance() {
Some(&self.current_term)
}
else {
None
}
}
}
impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> {
fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> {
TermIterator::new(
segment_readers
.iter()
.map(|reader| reader.term_infos().keys())
.collect()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use schema::{SchemaBuilder, Document, TEXT};
use core::Index;
#[test]
fn test_term_iterator() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b d f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "a b c d f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
{
{
let mut doc = Document::default();
doc.add_text(text_field, "e f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
}
let searcher = index.searcher();
let mut term_it = searcher.terms();
let mut terms = String::new();
while let Some(term) = term_it.next() {
unsafe {
terms.push_str(term.text());
}
}
assert_eq!(terms, "abcdef");
}
}

View File

@@ -4,7 +4,6 @@ use std::io;
use std::io::Write;
use fst;
use fst::raw::Fst;
use fst::Streamer;
use directory::ReadOnlySource;
use common::BinarySerializable;
@@ -66,27 +65,10 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
}))
}
pub struct FstKeyIter<'a, V: 'static + BinarySerializable> {
streamer: fst::map::Stream<'a>,
__phantom__: PhantomData<V>
}
impl<'a, V: 'static + BinarySerializable> FstKeyIter<'a, V> {
pub fn next(&mut self) -> Option<(&[u8])> {
self.streamer
.next()
.map(|(k, _)| k)
}
}
impl<V: BinarySerializable> FstMap<V> {
pub fn keys(&self,) -> FstKeyIter<V> {
FstKeyIter {
streamer: self.fst_index.stream(),
__phantom__: PhantomData,
}
pub fn keys(&self,) -> fst::map::Keys {
self.fst_index.keys()
}
pub fn from_source(source: ReadOnlySource) -> io::Result<FstMap<V>> {
@@ -123,6 +105,7 @@ mod tests {
use super::*;
use directory::{RAMDirectory, Directory};
use std::path::PathBuf;
use fst::Streamer;
#[test]
fn test_fstmap() {

View File

@@ -4,5 +4,4 @@ pub mod stacker;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;
pub use self::fstmap::FstKeyIter;
pub use self::skip::{SkipListBuilder, SkipList};

View File

@@ -3,130 +3,21 @@ use core::SegmentReader;
use core::Segment;
use DocId;
use core::SerializableSegment;
use indexer::SegmentSerializer;
use postings::PostingsSerializer;
use postings::TermInfo;
use postings::Postings;
use postings::DocSet;
use std::collections::BinaryHeap;
use datastruct::FstKeyIter;
use schema::{Term, Schema, Field};
use core::TermIterator;
use schema::{Schema, Field};
use fastfield::FastFieldSerializer;
use store::StoreWriter;
use postings::ChainedPostings;
use postings::HasLen;
use postings::OffsetPostings;
use core::SegmentInfo;
use std::cmp::{min, max, Ordering};
use std::cmp::{min, max};
use std::iter;
struct PostingsMerger<'a> {
doc_offsets: Vec<DocId>,
heap: BinaryHeap<HeapItem>,
term_streams: Vec<FstKeyIter<'a, TermInfo>>,
readers: &'a [SegmentReader],
}
#[derive(PartialEq, Eq, Debug)]
struct HeapItem {
term: Term,
segment_ord: usize,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &HeapItem) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &HeapItem) -> Ordering {
(&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord))
}
}
impl<'a> PostingsMerger<'a> {
fn new(readers: &'a [SegmentReader]) -> PostingsMerger<'a> {
let mut doc_offsets: Vec<DocId> = Vec::new();
let mut max_doc = 0;
for reader in readers {
doc_offsets.push(max_doc);
max_doc += reader.max_doc();
};
let term_streams = readers
.iter()
.map(|reader| reader.term_infos().keys())
.collect();
let mut postings_merger = PostingsMerger {
heap: BinaryHeap::new(),
term_streams: term_streams,
doc_offsets: doc_offsets,
readers: readers,
};
for segment_ord in 0..readers.len() {
postings_merger.push_next_segment_el(segment_ord);
}
postings_merger
}
// pushes the term_reader associated with the given segment ordinal
// into the heap.
fn push_next_segment_el(&mut self, segment_ord: usize) {
if let Some(term) = self.term_streams[segment_ord].next() {
let it = HeapItem {
term: Term::from(term),
segment_ord: segment_ord,
};
self.heap.push(it);
}
}
fn append_segment(&mut self,
heap_item: &HeapItem,
segment_postings_list: &mut Vec<OffsetPostings<'a>>) {
{
let offset = self.doc_offsets[heap_item.segment_ord];
let reader = &self.readers[heap_item.segment_ord];
if let Some(segment_postings) = reader.read_postings_all_info(&heap_item.term) {
let offset_postings = OffsetPostings::new(segment_postings, offset);
segment_postings_list.push(offset_postings);
}
}
self.push_next_segment_el(heap_item.segment_ord);
}
}
impl<'a> Iterator for PostingsMerger<'a> {
type Item = (Term, ChainedPostings<'a>);
fn next(&mut self,) -> Option<(Term, ChainedPostings<'a>)> {
// TODO remove the Vec<u8> allocations
match self.heap.pop() {
Some(heap_it) => {
let mut segment_postings_list = Vec::new();
self.append_segment(&heap_it, &mut segment_postings_list);
loop {
match self.heap.peek() {
Some(&ref next_heap_it) if next_heap_it.term == heap_it.term => {},
_ => { break; }
}
let next_heap_it = self.heap.pop().expect("This is only reached if an element was peeked beforehand.");
self.append_segment(&next_heap_it, &mut segment_postings_list);
}
let chained_posting = ChainedPostings::from(segment_postings_list);
Some((heap_it.term, chained_posting))
},
None => None
}
}
}
pub struct IndexMerger {
schema: Schema,
readers: Vec<SegmentReader>,
@@ -135,17 +26,15 @@ pub struct IndexMerger {
struct DeltaPositionComputer {
buffer: Vec<u32>
buffer: Vec<u32>,
}
impl DeltaPositionComputer {
fn new() -> DeltaPositionComputer {
DeltaPositionComputer {
buffer: iter::repeat(0u32).take(512).collect::<Vec<u32>>(),
}
DeltaPositionComputer { buffer: iter::repeat(0u32).take(512).collect::<Vec<u32>>() }
}
fn compute_delta_positions(&mut self, positions: &[u32],) -> &[u32] {
fn compute_delta_positions(&mut self, positions: &[u32]) -> &[u32] {
if positions.len() > self.buffer.len() {
self.buffer.resize(positions.len(), 0u32);
}
@@ -158,8 +47,6 @@ impl DeltaPositionComputer {
}
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> Result<IndexMerger> {
let mut readers = Vec::new();
@@ -172,20 +59,19 @@ impl IndexMerger {
Ok(IndexMerger {
schema: schema,
readers: readers,
segment_info: SegmentInfo {
max_doc: max_doc
},
segment_info: SegmentInfo { max_doc: max_doc },
})
}
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
// TODO make sure that works even if the field is never here.
for field in self.schema.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_indexed())
.map(|(field_id, _)| Field(field_id as u8)) {
for field in self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_indexed())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
@@ -208,11 +94,12 @@ impl IndexMerger {
}
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
for field in self.schema.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_u32_fast())
.map(|(field_id, _)| Field(field_id as u8)) {
for field in self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_u32_fast())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
@@ -235,16 +122,54 @@ impl IndexMerger {
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> {
let postings_merger = PostingsMerger::new(&self.readers);
let mut merged_terms = TermIterator::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
for (term, mut merged_doc_ids) in postings_merger {
try!(postings_serializer.new_term(&term, merged_doc_ids.len() as DocId));
while merged_doc_ids.advance() {
let delta_positions: &[u32] = delta_position_computer.compute_delta_positions(merged_doc_ids.positions());
try!(postings_serializer.write_doc(merged_doc_ids.doc(), merged_doc_ids.term_freq(), delta_positions));
let mut offsets: Vec<DocId> = Vec::new();
let mut max_doc = 0;
for reader in &self.readers {
offsets.push(max_doc);
max_doc += reader.max_doc();
}
while merged_terms.advance() {
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
//
// In the new segments, the doc id from the different
// segment are stacked so that :
// - Segment 0's doc ids become doc id [0, seg.max_doc]
// - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc]
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
let term = merged_terms.term();
let mut merged_postings =
ChainedPostings::from(
merged_terms
.segment_ords()
.iter()
.cloned()
.flat_map(|segment_ord| {
let offset = offsets[segment_ord];
self.readers[segment_ord]
.read_postings_all_info(&term)
.map(|segment_postings| OffsetPostings::new(segment_postings, offset))
})
.collect::<Vec<_>>()
);
// We can now serialize this postings, by pushing each document to the
// postings serializer.
try!(postings_serializer.new_term(&term, merged_postings.len() as DocId));
while merged_postings.advance() {
let delta_positions: &[u32] =
delta_position_computer.compute_delta_positions(merged_postings.positions());
try!(postings_serializer.write_doc(merged_postings.doc(),
merged_postings.term_freq(),
delta_positions));
}
try!(postings_serializer.close_term());
}
Ok(())
}
@@ -284,7 +209,9 @@ mod tests {
#[test]
fn test_index_merger() {
let mut schema_builder = schema::SchemaBuilder::default();
let text_fieldtype = schema::TextOptions::default().set_indexing_options(TextIndexingOptions::TokenizedWithFreq).set_stored();
let text_fieldtype = schema::TextOptions::default()
.set_indexing_options(TextIndexingOptions::TokenizedWithFreq)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = schema::U32Options::default().set_fast();
let score_field = schema_builder.add_u32_field("score", score_fieldtype);
@@ -346,22 +273,14 @@ mod tests {
collector.docs()
};
{
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "a"))),
vec!(1, 2, 4,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "af"))),
vec!(0, 3,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "g"))),
vec!(4,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "b"))),
vec!(0, 1, 2, 3, 4,)
);
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "a")]),
vec!(1, 2, 4,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "af")]),
vec!(0, 3,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "g")]),
vec!(4,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "b")]),
vec!(0, 1, 2, 3, 4,));
}
{
let doc = searcher.doc(&DocAddress(0, 0)).unwrap();
@@ -390,10 +309,8 @@ mod tests {
assert!(searcher.search(&query, &mut collector).is_ok());
collector.vals().clone()
};
assert_eq!(
get_fast_vals(vec!(Term::from_field_text(text_field, "a"))),
vec!(5, 7, 13,)
);
assert_eq!(get_fast_vals(vec![Term::from_field_text(text_field, "a")]),
vec!(5, 7, 13,));
}
}
}

View File

@@ -110,8 +110,7 @@ pub use self::common::TimerTree;
pub use postings::DocSet;
pub use postings::Postings;
pub use postings::SegmentPostingsOption;
pub use core::TermIterator;
/// Tantivy's makes it possible to personalize when
/// the indexer should merge its segments

View File

@@ -8,7 +8,8 @@ use common::BinarySerializable;
/// The schema is in charge of holding mapping between field names
/// to `Field` objects.
///
/// Because the field id is a `u8`, tantivy can only have at most `256` fields
/// Because the field id is a `u8`, tantivy can only have at most `255` fields.
/// Value 255 is reserved.
#[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash, RustcEncodable, RustcDecodable)]
pub struct Field(pub u8);

View File

@@ -2,7 +2,7 @@ use std::fmt;
use common::BinarySerializable;
use super::Field;
use std::str;
/// Term represents the value that the token can take.
@@ -11,6 +11,7 @@ use super::Field;
#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct Term(Vec<u8>);
impl Term {
/// Pre-allocate a term buffer.
@@ -63,6 +64,26 @@ impl Term {
Term(buffer)
}
/// Returns the serialized value of the term.
/// (this does not include the field.)
///
/// If the term is a string, its value is utf-8 encoded.
/// If the term is a u32, its value is encoded according
/// to `byteorder::LittleEndian`.
pub fn value(&self) -> &[u8] {
&self.0[1..]
}
/// Returns the text associated with the term.
///
/// # Panics
/// If the value is not valid utf-8. This may happen
/// if the index is corrupted or if you try to
/// call this method on a non-string type.
pub unsafe fn text(&self) -> &str {
str::from_utf8_unchecked(self.value())
}
/// Set the texts only, keeping the field untouched.
pub fn set_text(&mut self, text: &str) {
self.0.resize(1, 0u8);