issue/67 First stab. Iterator working.

This commit is contained in:
Paul Masurel
2016-12-16 23:20:05 +01:00
parent 1559733b03
commit ca5f3e1d46
9 changed files with 296 additions and 188 deletions

View File

@@ -7,7 +7,7 @@ mod segment_component;
mod segment;
mod index_meta;
mod pool;
mod term_iterator;
use std::path::PathBuf;
@@ -19,7 +19,7 @@ pub use self::segment::SegmentInfo;
pub use self::segment::SerializableSegment;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta};
pub use self::term_iterator::TermIterator;
lazy_static! {
pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");

View File

@@ -7,6 +7,7 @@ use query::Query;
use DocId;
use DocAddress;
use schema::Term;
use core::TermIterator;
/// Holds a list of `SegmentReader`s ready for search.
@@ -47,9 +48,18 @@ impl Searcher {
.map(|segment_reader| segment_reader.doc_freq(term))
.fold(0u32, |acc, val| acc + val)
}
/// Returns a Stream over all of the sorted unique terms of
/// the searcher.
///
/// This includes all of the fields from all of the segment_readers.
/// See [TermIterator](struct.TermIterator.html).
pub fn terms<'a>(&'a self) -> TermIterator<'a> {
TermIterator::from(self.segment_readers())
}
/// Return the list of segment readers
pub fn segment_readers(&self,) -> &Vec<SegmentReader> {
pub fn segment_readers(&self,) -> &[SegmentReader] {
&self.segment_readers
}

191
src/core/term_iterator.rs Normal file
View File

@@ -0,0 +1,191 @@
use fst::Streamer;
use std::mem;
use std::collections::BinaryHeap;
use fst::map::Keys;
use schema::Term;
use core::SegmentReader;
use std::cmp::Ordering;
static EMPTY: [u8; 0] = [];
#[derive(PartialEq, Eq, Debug)]
struct HeapItem {
term: Term,
segment_ord: usize,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &HeapItem) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &HeapItem) -> Ordering {
(&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermIterator<'a> {
key_streams: Vec<Keys<'a>>,
heap: BinaryHeap<HeapItem>,
// Buffer hosting the list of segment ordinals containing
// the current term.
current_term: Term,
current_segment_ords: Vec<usize>,
}
impl<'a> TermIterator<'a> {
fn new(key_streams: Vec<Keys<'a>>) -> TermIterator<'a> {
let key_streams_len = key_streams.len();
let mut term_iterator = TermIterator {
key_streams: key_streams,
heap: BinaryHeap::new(),
current_term: Term::from(&EMPTY[..]),
current_segment_ords: vec![],
};
for segment_ord in 0..key_streams_len {
term_iterator.push_next_segment_el(segment_ord);
}
term_iterator
}
fn push_next_segment_el(&mut self, segment_ord: usize) {
self.current_segment_ords.push(segment_ord);
if let Some(term) = self.key_streams[segment_ord].next() {
self.heap.push(HeapItem {
term: Term::from(term),
segment_ord: segment_ord,
});
}
}
}
impl<'a, 'f> Streamer<'a> for TermIterator<'f> {
type Item = (&'a Term, &'a [usize]);
fn next(&'a mut self) -> Option<Self::Item> {
self.current_segment_ords.clear();
self.heap
.pop()
.map(move |mut head| {
mem::swap(&mut self.current_term, &mut head.term);
self.push_next_segment_el(head.segment_ord);
loop {
match self.heap.peek() {
Some(&ref next_heap_it) if next_heap_it.term == self.current_term => {}
_ => {
break;
}
}
let next_heap_it = self.heap
.pop()
.expect("This is only reached if an element was \
peeked beforehand.");
self.push_next_segment_el(next_heap_it.segment_ord);
}
(&self.current_term, self.current_segment_ords.as_slice())
})
}
}
impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> {
fn from(segment_readers: &'a [SegmentReader]) -> TermIterator<'a> {
TermIterator::new(segment_readers.iter()
.map(|reader| reader.term_infos().keys())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use schema::{SchemaBuilder, Document, TEXT};
use core::Index;
#[test]
fn test_term_iterator() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
// writing the segment
{
let mut doc = Document::default();
doc.add_text(text_field, "a b d f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
{
// writing the segment
{
let mut doc = Document::default();
doc.add_text(text_field, "a b c d f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
{
// writing the segment
{
let mut doc = Document::default();
doc.add_text(text_field, "e f");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
}
}
let searcher = index.searcher();
let mut term_it = searcher.terms();
{
let (term, segments) = term_it.next().unwrap();
assert_eq!(term.value(), "a".as_bytes());
let expected_segments = [0, 1];
assert_eq!(segments, &expected_segments);
}
{
let (term, segments): (&Term, &[usize]) = term_it.next().unwrap();
assert_eq!(term.value(), "b".as_bytes());
let expected_segments = [0, 1];
assert_eq!(segments, &expected_segments);
}
{
let (ref term, ref segments) = term_it.next().unwrap();
assert_eq!(term.value(), "c".as_bytes());
let expected_segments = [1];
assert_eq!(segments, &expected_segments);
}
{
let (term, segments) = term_it.next().unwrap();
assert_eq!(term.value(), "d".as_bytes());
let expected_segments = [0, 1];
assert_eq!(segments, &expected_segments);
}
{
let (term, segments) = term_it.next().unwrap();
assert_eq!(term.value(), "e".as_bytes());
let expected_segments = [2];
assert_eq!(segments, &expected_segments);
}
{
let (term, segments) = term_it.next().unwrap();
assert_eq!(term.value(), "f".as_bytes());
let expected_segments = [0, 1, 2];
assert_eq!(segments, &expected_segments);
}
}
}

View File

@@ -4,7 +4,6 @@ use std::io;
use std::io::Write;
use fst;
use fst::raw::Fst;
use fst::Streamer;
use directory::ReadOnlySource;
use common::BinarySerializable;
@@ -66,27 +65,10 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
}))
}
pub struct FstKeyIter<'a, V: 'static + BinarySerializable> {
streamer: fst::map::Stream<'a>,
__phantom__: PhantomData<V>
}
impl<'a, V: 'static + BinarySerializable> FstKeyIter<'a, V> {
pub fn next(&mut self) -> Option<(&[u8])> {
self.streamer
.next()
.map(|(k, _)| k)
}
}
impl<V: BinarySerializable> FstMap<V> {
pub fn keys(&self,) -> FstKeyIter<V> {
FstKeyIter {
streamer: self.fst_index.stream(),
__phantom__: PhantomData,
}
pub fn keys(&self,) -> fst::map::Keys {
self.fst_index.keys()
}
pub fn from_source(source: ReadOnlySource) -> io::Result<FstMap<V>> {
@@ -123,6 +105,7 @@ mod tests {
use super::*;
use directory::{RAMDirectory, Directory};
use std::path::PathBuf;
use fst::Streamer;
#[test]
fn test_fstmap() {

View File

@@ -4,5 +4,4 @@ pub mod stacker;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;
pub use self::fstmap::FstKeyIter;
pub use self::skip::{SkipListBuilder, SkipList};

View File

@@ -3,130 +3,22 @@ use core::SegmentReader;
use core::Segment;
use DocId;
use core::SerializableSegment;
use indexer::SegmentSerializer;
use postings::PostingsSerializer;
use postings::TermInfo;
use postings::Postings;
use postings::DocSet;
use std::collections::BinaryHeap;
use datastruct::FstKeyIter;
use schema::{Term, Schema, Field};
use core::TermIterator;
use fst::Streamer;
use schema::{Schema, Field};
use fastfield::FastFieldSerializer;
use store::StoreWriter;
use postings::ChainedPostings;
use postings::HasLen;
use postings::OffsetPostings;
use core::SegmentInfo;
use std::cmp::{min, max, Ordering};
use std::cmp::{min, max};
use std::iter;
struct PostingsMerger<'a> {
doc_offsets: Vec<DocId>,
heap: BinaryHeap<HeapItem>,
term_streams: Vec<FstKeyIter<'a, TermInfo>>,
readers: &'a [SegmentReader],
}
#[derive(PartialEq, Eq, Debug)]
struct HeapItem {
term: Term,
segment_ord: usize,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &HeapItem) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &HeapItem) -> Ordering {
(&other.term, &other.segment_ord).cmp(&(&self.term, &self.segment_ord))
}
}
impl<'a> PostingsMerger<'a> {
fn new(readers: &'a [SegmentReader]) -> PostingsMerger<'a> {
let mut doc_offsets: Vec<DocId> = Vec::new();
let mut max_doc = 0;
for reader in readers {
doc_offsets.push(max_doc);
max_doc += reader.max_doc();
};
let term_streams = readers
.iter()
.map(|reader| reader.term_infos().keys())
.collect();
let mut postings_merger = PostingsMerger {
heap: BinaryHeap::new(),
term_streams: term_streams,
doc_offsets: doc_offsets,
readers: readers,
};
for segment_ord in 0..readers.len() {
postings_merger.push_next_segment_el(segment_ord);
}
postings_merger
}
// pushes the term_reader associated with the given segment ordinal
// into the heap.
fn push_next_segment_el(&mut self, segment_ord: usize) {
if let Some(term) = self.term_streams[segment_ord].next() {
let it = HeapItem {
term: Term::from(term),
segment_ord: segment_ord,
};
self.heap.push(it);
}
}
fn append_segment(&mut self,
heap_item: &HeapItem,
segment_postings_list: &mut Vec<OffsetPostings<'a>>) {
{
let offset = self.doc_offsets[heap_item.segment_ord];
let reader = &self.readers[heap_item.segment_ord];
if let Some(segment_postings) = reader.read_postings_all_info(&heap_item.term) {
let offset_postings = OffsetPostings::new(segment_postings, offset);
segment_postings_list.push(offset_postings);
}
}
self.push_next_segment_el(heap_item.segment_ord);
}
}
impl<'a> Iterator for PostingsMerger<'a> {
type Item = (Term, ChainedPostings<'a>);
fn next(&mut self,) -> Option<(Term, ChainedPostings<'a>)> {
// TODO remove the Vec<u8> allocations
match self.heap.pop() {
Some(heap_it) => {
let mut segment_postings_list = Vec::new();
self.append_segment(&heap_it, &mut segment_postings_list);
loop {
match self.heap.peek() {
Some(&ref next_heap_it) if next_heap_it.term == heap_it.term => {},
_ => { break; }
}
let next_heap_it = self.heap.pop().expect("This is only reached if an element was peeked beforehand.");
self.append_segment(&next_heap_it, &mut segment_postings_list);
}
let chained_posting = ChainedPostings::from(segment_postings_list);
Some((heap_it.term, chained_posting))
},
None => None
}
}
}
pub struct IndexMerger {
schema: Schema,
readers: Vec<SegmentReader>,
@@ -135,17 +27,15 @@ pub struct IndexMerger {
struct DeltaPositionComputer {
buffer: Vec<u32>
buffer: Vec<u32>,
}
impl DeltaPositionComputer {
fn new() -> DeltaPositionComputer {
DeltaPositionComputer {
buffer: iter::repeat(0u32).take(512).collect::<Vec<u32>>(),
}
DeltaPositionComputer { buffer: iter::repeat(0u32).take(512).collect::<Vec<u32>>() }
}
fn compute_delta_positions(&mut self, positions: &[u32],) -> &[u32] {
fn compute_delta_positions(&mut self, positions: &[u32]) -> &[u32] {
if positions.len() > self.buffer.len() {
self.buffer.resize(positions.len(), 0u32);
}
@@ -158,8 +48,6 @@ impl DeltaPositionComputer {
}
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> Result<IndexMerger> {
let mut readers = Vec::new();
@@ -172,20 +60,19 @@ impl IndexMerger {
Ok(IndexMerger {
schema: schema,
readers: readers,
segment_info: SegmentInfo {
max_doc: max_doc
},
segment_info: SegmentInfo { max_doc: max_doc },
})
}
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
// TODO make sure that works even if the field is never here.
for field in self.schema.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_indexed())
.map(|(field_id, _)| Field(field_id as u8)) {
for field in self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_indexed())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
@@ -208,11 +95,12 @@ impl IndexMerger {
}
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
for field in self.schema.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_u32_fast())
.map(|(field_id, _)| Field(field_id as u8)) {
for field in self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_u32_fast())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
@@ -235,16 +123,51 @@ impl IndexMerger {
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> {
let postings_merger = PostingsMerger::new(&self.readers);
let mut merged_terms = TermIterator::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
for (term, mut merged_doc_ids) in postings_merger {
try!(postings_serializer.new_term(&term, merged_doc_ids.len() as DocId));
while merged_doc_ids.advance() {
let delta_positions: &[u32] = delta_position_computer.compute_delta_positions(merged_doc_ids.positions());
try!(postings_serializer.write_doc(merged_doc_ids.doc(), merged_doc_ids.term_freq(), delta_positions));
let mut offsets: Vec<DocId> = Vec::new();
let mut max_doc = 0;
for reader in &self.readers {
offsets.push(max_doc);
max_doc += reader.max_doc();
}
while let Some((term, segment_ords)) = merged_terms.next() {
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
//
// In the new segments, the doc id from the different
// segment are stacked so that :
// - Segment 0's doc ids become doc id [0, seg.max_doc]
// - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc]
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
let mut merged_postings =
ChainedPostings::from(segment_ords.iter()
.flat_map(|segment_ord| {
let offset = offsets[*segment_ord];
self.readers[*segment_ord]
.read_postings_all_info(&term)
.map(|segment_postings| {
OffsetPostings::new(segment_postings,
offset)
})
})
.collect::<Vec<_>>());
// We can now serialize this postings, by pushing each document to the
// postings serializer.
try!(postings_serializer.new_term(&term, merged_postings.len() as DocId));
while merged_postings.advance() {
let delta_positions: &[u32] =
delta_position_computer.compute_delta_positions(merged_postings.positions());
try!(postings_serializer.write_doc(merged_postings.doc(),
merged_postings.term_freq(),
delta_positions));
}
try!(postings_serializer.close_term());
}
Ok(())
}
@@ -284,7 +207,9 @@ mod tests {
#[test]
fn test_index_merger() {
let mut schema_builder = schema::SchemaBuilder::default();
let text_fieldtype = schema::TextOptions::default().set_indexing_options(TextIndexingOptions::TokenizedWithFreq).set_stored();
let text_fieldtype = schema::TextOptions::default()
.set_indexing_options(TextIndexingOptions::TokenizedWithFreq)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = schema::U32Options::default().set_fast();
let score_field = schema_builder.add_u32_field("score", score_fieldtype);
@@ -346,22 +271,14 @@ mod tests {
collector.docs()
};
{
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "a"))),
vec!(1, 2, 4,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "af"))),
vec!(0, 3,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "g"))),
vec!(4,)
);
assert_eq!(
get_doc_ids(vec!(Term::from_field_text(text_field, "b"))),
vec!(0, 1, 2, 3, 4,)
);
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "a")]),
vec!(1, 2, 4,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "af")]),
vec!(0, 3,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "g")]),
vec!(4,));
assert_eq!(get_doc_ids(vec![Term::from_field_text(text_field, "b")]),
vec!(0, 1, 2, 3, 4,));
}
{
let doc = searcher.doc(&DocAddress(0, 0)).unwrap();
@@ -390,10 +307,8 @@ mod tests {
assert!(searcher.search(&query, &mut collector).is_ok());
collector.vals().clone()
};
assert_eq!(
get_fast_vals(vec!(Term::from_field_text(text_field, "a"))),
vec!(5, 7, 13,)
);
assert_eq!(get_fast_vals(vec![Term::from_field_text(text_field, "a")]),
vec!(5, 7, 13,));
}
}
}

View File

@@ -110,8 +110,7 @@ pub use self::common::TimerTree;
pub use postings::DocSet;
pub use postings::Postings;
pub use postings::SegmentPostingsOption;
pub use core::TermIterator;
/// Tantivy's makes it possible to personalize when
/// the indexer should merge its segments

View File

@@ -8,7 +8,8 @@ use common::BinarySerializable;
/// The schema is in charge of holding mapping between field names
/// to `Field` objects.
///
/// Because the field id is a `u8`, tantivy can only have at most `256` fields
/// Because the field id is a `u8`, tantivy can only have at most `255` fields.
/// Value 255 is reserved.
#[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash, RustcEncodable, RustcDecodable)]
pub struct Field(pub u8);

View File

@@ -8,9 +8,11 @@ use super::Field;
/// Term represents the value that the token can take.
///
/// It actually wraps a `Vec<u8>`.
/// TODO remove pub
#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct Term(Vec<u8>);
impl Term {
/// Pre-allocate a term buffer.
@@ -63,6 +65,14 @@ impl Term {
Term(buffer)
}
/// Returns the serialized value associated to the field.
/// If the term is a string, its value is utf-8 encoded.
/// If the term is a u32, its value is encoded according
/// to `byteorder::LittleEndian`.
pub fn value(&self) -> &[u8] {
&self.0[1..]
}
/// Set the texts only, keeping the field untouched.
pub fn set_text(&mut self, text: &str) {
self.0.resize(1, 0u8);