Compare commits

...

36 Commits

Author SHA1 Message Date
Paul Masurel
d07e896a2f Exposed API to create a new Segment. 2017-05-13 15:15:35 +09:00
Paul Masurel
574feb8026 Merge branch 'issue/136' into tantivy-imhotep 2017-05-12 17:04:20 +09:00
Paul Masurel
0bc047f8e1 blop 2017-05-12 13:39:04 +09:00
Paul Masurel
2c2aa3d66c Merge branch 'issue/136' into tantivy-imhotep
Conflicts:
	src/postings/mod.rs
2017-05-11 22:42:03 +09:00
Paul Masurel
da99bbcb9d Merge branch 'issue/indexing-refactoring' into tantivy-imhotep
Conflicts:
	src/common/mod.rs
2017-05-10 21:27:44 +09:00
Paul Masurel
33f9426dd6 Merge branch 'master' into tantivy-imhotep 2017-05-07 15:58:13 +09:00
Paul Masurel
647c97fa3d Expose common 2017-05-07 14:31:56 +09:00
Paul Masurel
8029aea548 Exposing store 2017-05-07 14:24:36 +09:00
Paul Masurel
3b33484cf8 compatibility with tantivy-imhotep 2017-05-07 14:19:38 +09:00
Paul Masurel
2a909ddcc7 Merge branch 'master' into tantivy-imhotep
Conflicts:
	src/collector/mod.rs
	src/common/bitpacker.rs
	src/common/mod.rs
	src/core/segment_reader.rs
	src/fastfield/mod.rs
	src/fastfield/reader.rs
	src/fastfield/writer.rs
	src/functional_test.rs
	src/indexer/merger.rs
	src/indexer/segment_writer.rs
	src/lib.rs
	src/postings/serializer.rs
	src/query/query_parser/query_parser.rs
	src/schema/document.rs
	src/schema/field_entry.rs
	src/schema/field_type.rs
	src/schema/int_options.rs
	src/schema/mod.rs
	src/schema/named_field_document.rs
	src/schema/schema.rs
	src/schema/value.rs
2017-05-07 14:03:18 +09:00
Paul Masurel
925b9063a7 Bugfix in the streamdictionary. Impl of Sync and Send for FastFieldReader 2017-04-23 10:50:14 +08:00
Paul Masurel
5e1ce381fe Merge branch 'issues/65' into tantivy-imhotep
Conflicts:
	src/core/segment_reader.rs
	src/fastfield/reader.rs
2017-04-21 09:53:14 +09:00
Paul Masurel
67381e448f Renamed u64options 2017-04-21 09:13:26 +09:00
Paul Masurel
19d535c28e issue/65 Switching to u64. 2017-04-20 13:32:59 +09:00
Paul Masurel
e00d6538fa NOBUG Improve interface 2017-04-19 22:35:52 +09:00
Paul Masurel
8d7445f08a removing the 255 fields limit 2017-04-19 20:07:38 +09:00
Paul Masurel
202e69b7e0 BUGFIX the thing observed on windows 2017-04-15 19:41:14 +09:00
Paul Masurel
a650969509 Merge branch 'master' into tantivy-imhotep 2017-04-15 13:11:58 +09:00
Paul Masurel
c8d06d63b9 Test on UTF-8 2017-04-15 09:43:12 +09:00
Paul Masurel
7eec9f038d Merge branch 'master' into tantivy-imhotep
Conflicts:
	src/common/mod.rs
	src/core/segment_reader.rs
	src/datastruct/fstmap.rs
	src/indexer/merger.rs
	src/postings/mod.rs
	src/postings/segment_postings.rs
	src/postings/serializer.rs
	src/query/boolean_query/mod.rs
	src/query/term_query/term_scorer.rs
	src/query/term_query/term_weight.rs
2017-04-15 00:21:56 +09:00
Paul Masurel
57870fdcef Added a stream builder. 2017-04-14 23:23:26 +09:00
Paul Masurel
c0f2055e32 Added dictionary optimized for streaming 2017-04-11 23:07:34 +09:00
Paul Masurel
9a8f06a523 bugfix on opening termquery when there is no termfreq 2017-02-24 19:08:14 +09:00
Paul Masurel
bb57bee099 committing random shit because of jason 2017-02-23 22:52:17 +09:00
Paul Masurel
bc2a1f00e6 send sync for u32fastfieldreader 2017-02-23 21:11:52 +09:00
Paul Masurel
391f258ff3 Making u32fastfield send/sync 2017-02-23 20:11:35 +09:00
Paul Masurel
673712a339 Added public method to schema. 2017-02-23 19:48:06 +09:00
Paul Masurel
29ad1d84e5 exposing fastfield as public 2017-02-23 19:28:51 +09:00
Paul Masurel
62d9236200 Bugfix 2017-02-23 17:24:22 +09:00
Paul Masurel
f5f8e130b0 Exposing fstmap 2017-02-23 13:15:23 +09:00
Paul Masurel
d5d9218093 made datastruct public to help generate doc. 2017-02-23 11:12:29 +09:00
Paul Masurel
a44f34c49d NOBUG cleanup 2017-02-22 19:40:01 +09:00
Paul Masurel
d8ea083177 Added block iterator for segment postings. 2017-02-22 18:38:58 +09:00
Paul Masurel
d32dff1da9 NOBUG added advance_block 2017-02-22 10:50:25 +09:00
Paul Masurel
f9ca0b16f1 NOBUG Try block iteration 2017-02-22 10:45:19 +09:00
Paul Masurel
a39fe90930 NOBUG Change the code for Box<Scorer> 2017-02-22 09:38:43 +09:00
17 changed files with 767 additions and 89 deletions

View File

@@ -0,0 +1,58 @@
use std::io::Write;
use std::io;
pub struct CountingWriter<W: Write> {
underlying: W,
written_bytes: usize,
}
impl<W: Write> CountingWriter<W> {
pub fn wrap(underlying: W) -> CountingWriter<W> {
CountingWriter {
underlying: underlying,
written_bytes: 0,
}
}
pub fn written_bytes(&self,) -> usize {
self.written_bytes
}
pub fn finish(mut self) -> io::Result<(W, usize)> {
self.flush()?;
Ok((self.underlying, self.written_bytes))
}
}
impl<W: Write> Write for CountingWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let written_size = self.underlying.write(buf)?;
self.written_bytes += written_size;
Ok(written_size)
}
fn flush(&mut self) -> io::Result<()> {
self.underlying.flush()
}
}
#[cfg(test)]
mod test {
use super::CountingWriter;
use std::io::Write;
#[test]
fn test_counting_writer() {
let buffer: Vec<u8> = vec!();
let mut counting_writer = CountingWriter::wrap(buffer);
let bytes = (0u8..10u8).collect::<Vec<u8>>();
counting_writer.write_all(&bytes).unwrap();
let (w, len): (Vec<u8>, usize) = counting_writer.finish().unwrap();
assert_eq!(len, 10);
assert_eq!(w.len(), 10);
}
}

View File

@@ -2,12 +2,15 @@ mod serialize;
mod timer;
mod vint;
pub mod bitpacker;
mod counting_writer;
pub use self::serialize::BinarySerializable;
pub use self::timer::Timing;
pub use self::timer::TimerTree;
pub use self::timer::OpenTimer;
pub use self::vint::VInt;
pub use self::counting_writer::CountingWriter;
use std::io;
/// Create a default io error given a string.
@@ -76,4 +79,4 @@ mod test {
test_i64_converter_helper(i);
}
}
}
}

View File

@@ -7,13 +7,14 @@ use common::HasLen;
use core::SegmentMeta;
use fastfield::{self, FastFieldNotAvailableError};
use fastfield::DeleteBitSet;
use postings::BlockSegmentPostings;
use store::StoreReader;
use schema::Document;
use directory::ReadOnlySource;
use DocId;
use std::str;
use postings::TermInfo;
use datastruct::FstMap;
use datastruct::TermDictionary;
use std::sync::Arc;
use std::fmt;
use schema::Field;
@@ -42,7 +43,7 @@ use schema::TextIndexingOptions;
pub struct SegmentReader {
segment_id: SegmentId,
segment_meta: SegmentMeta,
term_infos: Arc<FstMap<TermInfo>>,
term_infos: Arc<TermDictionary<TermInfo>>,
postings_data: ReadOnlySource,
store_reader: StoreReader,
fast_fields_reader: Arc<FastFieldsReader>,
@@ -61,6 +62,11 @@ impl SegmentReader {
self.segment_meta.max_doc()
}
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Returns the number of documents.
/// Deleted documents are not counted.
///
@@ -133,11 +139,12 @@ impl SegmentReader {
pub fn open(segment: Segment) -> Result<SegmentReader> {
let source = try!(segment.open_read(SegmentComponent::TERMS));
let term_infos = try!(FstMap::from_source(source));
let term_infos = try!(TermDictionary::from_source(source));
let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE)));
let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS));
let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS));
let fast_fields_reader = try!(FastFieldsReader::open(fast_field_data));
let fieldnorms_data = try!(segment.open_read(SegmentComponent::FIELDNORMS));
@@ -172,7 +179,7 @@ impl SegmentReader {
}
/// Return the term dictionary datastructure.
pub fn term_infos(&self) -> &FstMap<TermInfo> {
pub fn term_infos(&self) -> &TermDictionary<TermInfo> {
&self.term_infos
}
@@ -185,16 +192,29 @@ impl SegmentReader {
}
/// Returns the segment postings associated with the term, and with the given option,
/// or `None` if the term has never been encounterred and indexed.
///
/// If the field was not indexed with the indexing options that cover
/// the requested options, the returned `SegmentPostings` the method does not fail
/// and returns a `SegmentPostings` with as much information as possible.
///
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a `TextIndexingOptions`
/// that does not index position will return a `SegmentPostings` with `DocId`s and frequencies.
pub fn read_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<SegmentPostings> {
pub fn postings_data(&self, offset: usize) -> &[u8] {
&self.postings_data[offset..]
}
pub fn get_block_postings(&self) -> BlockSegmentPostings {
BlockSegmentPostings::from_data(0, &self.postings_data[..], FreqHandler::new_without_freq())
}
pub fn read_block_postings_from_terminfo(&self, term_info: &TermInfo, field_type: &FieldType) -> Option<BlockSegmentPostings> {
let offset = term_info.postings_offset as usize;
let postings_data = &self.postings_data[offset..];
let freq_handler = match *field_type {
FieldType::Str(_) => {
FreqHandler::new_without_freq()
}
_ => {
FreqHandler::new_without_freq()
}
};
Some(BlockSegmentPostings::from_data(term_info.doc_freq as usize, postings_data, freq_handler))
}
pub fn read_block_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<BlockSegmentPostings> {
let field = term.field();
let field_entry = self.schema.get_field_entry(field);
let term_info = get!(self.get_term_info(&term));
@@ -234,7 +254,23 @@ impl SegmentReader {
FreqHandler::new_without_freq()
}
};
Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, &self.delete_bitset, freq_handler))
Some(BlockSegmentPostings::from_data(term_info.doc_freq as usize, postings_data, freq_handler))
}
/// Returns the segment postings associated with the term, and with the given option,
/// or `None` if the term has never been encounterred and indexed.
///
/// If the field was not indexed with the indexing options that cover
/// the requested options, the returned `SegmentPostings` the method does not fail
/// and returns a `SegmentPostings` with as much information as possible.
///
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a `TextIndexingOptions`
/// that does not index position will return a `SegmentPostings` with `DocId`s and frequencies.
pub fn read_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<SegmentPostings> {
self.read_block_postings(term, option)
.map(|block_postings| {
SegmentPostings::from_block_postings(block_postings, self.delete_bitset.clone())
})
}

View File

@@ -1,7 +1,8 @@
use fst::Streamer;
use std::mem;
use std::collections::BinaryHeap;
use fst::map::Keys;
use postings::TermInfo;
use datastruct::TermDictionaryStreamer;
use schema::Field;
use schema::Term;
use core::SegmentReader;
@@ -34,7 +35,7 @@ impl Ord for HeapItem {
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermIterator<'a> {
key_streams: Vec<Keys<'a>>,
key_streams: Vec<TermDictionaryStreamer<'a, TermInfo>>,
heap: BinaryHeap<HeapItem>,
// Buffer hosting the list of segment ordinals containing
// the current term.
@@ -43,7 +44,7 @@ pub struct TermIterator<'a> {
}
impl<'a> TermIterator<'a> {
fn new(key_streams: Vec<Keys<'a>>) -> TermIterator<'a> {
fn new(key_streams: Vec<TermDictionaryStreamer<'a, TermInfo>>) -> TermIterator<'a> {
let key_streams_len = key_streams.len();
TermIterator {
key_streams: key_streams,
@@ -98,7 +99,7 @@ impl<'a> TermIterator<'a> {
fn advance_segments(&mut self) {
for segment_ord in self.current_segment_ords.drain(..) {
if let Some(term) = self.key_streams[segment_ord].next() {
if let Some((term, _val)) = self.key_streams[segment_ord].next() {
self.heap.push(HeapItem {
term: Term::from_bytes(term),
segment_ord: segment_ord,
@@ -126,7 +127,7 @@ impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> {
TermIterator::new(
segment_readers
.iter()
.map(|reader| reader.term_infos().keys())
.map(|reader| reader.term_infos().stream())
.collect()
)
}

View File

@@ -9,6 +9,7 @@ use directory::ReadOnlySource;
use common::BinarySerializable;
use std::marker::PhantomData;
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
@@ -92,6 +93,10 @@ impl<V: BinarySerializable> FstMap<V> {
self.fst_index.keys()
}
pub fn fst_index(&self) -> &fst::Map {
&self.fst_index
}
pub fn from_source(source: ReadOnlySource) -> io::Result<FstMap<V>> {
let total_len = source.len();
let length_offset = total_len - 4;
@@ -107,8 +112,8 @@ impl<V: BinarySerializable> FstMap<V> {
_phantom_: PhantomData,
})
}
fn read_value(&self, offset: u64) -> V {
pub fn read_value(&self, offset: u64) -> V {
let buffer = self.values_mmap.as_slice();
let mut cursor = &buffer[(offset as usize)..];
V::deserialize(&mut cursor).expect("Data in FST is corrupted")

View File

@@ -1,7 +1,15 @@
mod fstmap;
mod skip;
pub mod stacker;
mod stream_dictionary;
//pub use self::fstmap::FstMapBuilder as TermDictionaryBuilder;
//pub use self::fstmap::FstMap as TermDictionary;
pub use self::stream_dictionary::StreamDictionaryBuilder as TermDictionaryBuilder;
pub use self::stream_dictionary::StreamDictionary as TermDictionary;
pub use self::stream_dictionary::StreamDictionaryStreamer as TermDictionaryStreamer;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;
pub use self::skip::{SkipListBuilder, SkipList};

View File

@@ -91,6 +91,10 @@ impl Heap {
pub fn get_mut_ref<Item>(&self, addr: u32) -> &mut Item {
self.inner().get_mut_ref(addr)
}
pub fn get_ref<Item>(&self, addr: u32) -> &Item {
self.inner().get_mut_ref(addr)
}
}

View File

@@ -0,0 +1,465 @@
#![allow(should_implement_trait)]
use std::cmp::max;
use std::io;
use std::io::Write;
use std::io::Read;
use fst;
use fst::raw::Fst;
use common::VInt;
use directory::ReadOnlySource;
use common::BinarySerializable;
use std::marker::PhantomData;
use common::CountingWriter;
use std::cmp::Ordering;
use fst::{IntoStreamer, Streamer};
use std::str;
use fst::raw::Node;
use fst::raw::CompiledAddr;
const BLOCK_SIZE: usize = 1024;
fn convert_fst_error(e: fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
pub struct StreamDictionaryBuilder<W: Write, V: BinarySerializable + Clone + Default> {
write: CountingWriter<W>,
block_index: fst::MapBuilder<Vec<u8>>,
last_key: Vec<u8>,
len: usize,
_phantom_: PhantomData<V>,
}
fn common_prefix_length(left: &[u8], right: &[u8]) -> usize {
left.iter().cloned()
.zip(right.iter().cloned())
.take_while(|&(b1, b2)| b1 == b2)
.count()
}
fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
loop {
if let Some(transition) = node.transitions().last() {
buffer.push(transition.inp);
node = fst.node(transition.addr);
}
else {
break;
}
}
}
fn strictly_previous_key<B: AsRef<[u8]>>(fst_map: &fst::Map, key_as_ref: B) -> (Vec<u8>, u64) {
let key = key_as_ref.as_ref();
let fst = fst_map.as_fst();
let mut node = fst.root();
let mut node_stack: Vec<Node> = vec!(node.clone());
// first check the longest prefix.
for &b in &key[..key.len() - 1] {
node = match node.find_input(b) {
None => {
break;
},
Some(i) => {
fst.node(node.transition_addr(i))
},
};
node_stack.push(node);
}
let len_node_stack = node_stack.len();
for i in (1..len_node_stack).rev() {
let cur_node = &node_stack[i];
let b: u8 = key[i];
let last_transition_opt = cur_node
.transitions()
.take_while(|transition| transition.inp < b)
.last();
if let Some(last_transition) = last_transition_opt {
let mut result_buffer = Vec::from(&key[..i]);
result_buffer.push(last_transition.inp);
let mut result = Vec::from(&key[..i]);
result.push(last_transition.inp);
let fork_node = fst.node(last_transition.addr);
fill_last(fst, fork_node, &mut result);
let val = fst_map.get(&result).unwrap();
return (result, val);
}
else if cur_node.is_final() {
// the previous key is a prefix
let result_buffer = Vec::from(&key[..i]);
let val = fst_map.get(&result_buffer).unwrap();
return (result_buffer, val);
}
}
return (vec!(), 0);
}
impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<W, V> {
pub fn new(write: W) -> io::Result<StreamDictionaryBuilder<W, V>> {
let buffer: Vec<u8> = vec!();
Ok(StreamDictionaryBuilder {
write: CountingWriter::wrap(write),
block_index: fst::MapBuilder::new(buffer)
.expect("This cannot fail"),
last_key: Vec::with_capacity(128),
len: 0,
_phantom_: PhantomData,
})
}
fn add_index_entry(&mut self) {
self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap();
}
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
self.insert_key(key)?;
self.insert_value(value)
}
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{
if self.len % BLOCK_SIZE == 0 {
self.add_index_entry();
}
self.len += 1;
let common_len = common_prefix_length(key, &self.last_key);
VInt(common_len as u64).serialize(&mut self.write)?;
self.last_key.truncate(common_len);
self.last_key.extend_from_slice(&key[common_len..]);
VInt((key.len() - common_len) as u64).serialize(&mut self.write)?;
self.write.write_all(&key[common_len..])?;
Ok(())
}
pub fn insert_value(&mut self, value: &V) -> io::Result<()>{
value.serialize(&mut self.write)?;
Ok(())
}
pub fn finish(mut self) -> io::Result<W> {
self.add_index_entry();
let (mut w, split_len) = self.write.finish()?;
let fst_write = self.block_index
.into_inner()
.map_err(convert_fst_error)?;
w.write(&fst_write)?;
(split_len as u64).serialize(&mut w)?;
w.flush()?;
Ok(w)
}
}
fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary<V>, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> {
let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref());
let offset: usize = offset as usize;
StreamDictionaryStreamer {
cursor: &stream_dictionary.stream_data.as_slice()[offset..],
current_key: Vec::from(prev_key),
current_value: V::default(),
}
}
pub struct StreamDictionary<V> where V:BinarySerializable + Default + Clone {
stream_data: ReadOnlySource,
fst_index: fst::Map,
_phantom_: PhantomData<V>,
}
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
Ok(fst::Map::from(match source {
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)),
}))
}
impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
pub fn from_source(source: ReadOnlySource) -> io::Result<StreamDictionary<V>> {
let total_len = source.len();
let length_offset = total_len - 8;
let split_len: usize = {
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
u64::deserialize(&mut split_len_buffer)? as usize
};
let stream_data = source.slice(0, split_len);
let fst_data = source.slice(split_len, length_offset);
let fst_index = open_fst_index(fst_data)?;
Ok(StreamDictionary {
stream_data: stream_data,
fst_index: fst_index,
_phantom_: PhantomData
})
}
pub fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
let mut streamer = stream_before(self, target_key.as_ref());
while let Some((iter_key, iter_val)) = streamer.next() {
match iter_key.cmp(target_key.as_ref()) {
Ordering::Less => {}
Ordering::Equal => {
let val: V = (*iter_val).clone();
return Some(val);
}
Ordering::Greater => {
return None;
}
}
}
return None;
}
pub fn range(&self) -> StreamDictionaryStreamerBuilder<V> {
let data: &[u8] = &self.stream_data;
StreamDictionaryStreamerBuilder {
stream_dictionary: &self,
offset_from: 0,
offset_to: (data.as_ptr() as usize) + data.len(),
current_key: vec!(),
}
}
pub fn stream(&self) -> StreamDictionaryStreamer<V> {
StreamDictionaryStreamer {
cursor: &*self.stream_data,
current_key: Vec::with_capacity(128),
current_value: V::default(),
}
}
}
pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> {
stream_dictionary: &'a StreamDictionary<V>,
offset_from: usize,
offset_to: usize,
current_key: Vec<u8>,
}
/// Returns offset information for the first
/// key in the stream matching a given predicate.
///
/// returns (start offset, the data required to load the value)
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer<V>) -> (usize, Vec<u8>)
where V: 'a + BinarySerializable + Clone + Default {
let mut prev: &[u8] = streamer.cursor;
let mut prev_data: Vec<u8> = streamer.current_key.clone();
while let Some((iter_key, _)) = streamer.next() {
if !predicate(iter_key) {
return (prev.as_ptr() as usize, prev_data);
}
prev = streamer.cursor;
prev_data.clear();
prev_data.extend_from_slice(iter_key);
}
return (prev.as_ptr() as usize, prev_data);
}
impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> {
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
let target_key = bound.as_ref();
let streamer = stream_before(&self.stream_dictionary, target_key.as_ref());
let smaller_than = |k: &[u8]| { k.lt(target_key) };
let (offset_before, current_key) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.offset_from = offset_before;
self
}
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
let target_key = bound.as_ref();
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
let smaller_than = |k: &[u8]| { k.le(target_key) };
let (offset_before, current_key) = get_offset(smaller_than, streamer);
self.current_key = current_key;
self.offset_from = offset_before;
self
}
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
let target_key = bound.as_ref();
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
let smaller_than = |k: &[u8]| { k.le(target_key) };
let (offset_before, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before;
self
}
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
let target_key = bound.as_ref();
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
let smaller_than = |k: &[u8]| { k.lt(target_key) };
let (offset_before, _) = get_offset(smaller_than, streamer);
self.offset_to = offset_before;
self
}
pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> {
let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..];
let origin = data.as_ptr() as usize;
let start = self.offset_from - origin;
let stop = max(self.offset_to - origin, start);
StreamDictionaryStreamer {
cursor: &data[start..stop],
current_key: self.current_key,
current_value: V::default(),
}
}
}
pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> {
cursor: &'a [u8],
current_key: Vec<u8>,
current_value: V,
}
impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> {
pub fn next(&mut self) -> Option<(&[u8], &V)> {
if self.cursor.len() == 0 {
return None;
}
let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
self.current_key.reserve(new_length);
unsafe {
self.current_key.set_len(new_length);
}
self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap();
self.current_value = V::deserialize(&mut self.cursor).unwrap();
Some((&self.current_key, &self.current_value))
}
pub fn key(&self) -> &[u8] {
&self.current_key
}
pub fn value(&self) -> &V {
&self.current_value
}
}
#[cfg(test)]
mod test {
use std::str;
use directory::ReadOnlySource;
use super::CountingWriter;
use std::io::Write;
use super::{BLOCK_SIZE, StreamDictionary, StreamDictionaryBuilder};
#[test]
fn test_stream_dictionary() {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
for &(ref id, ref i) in &ids {
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
}
stream_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
{
let mut streamer = stream_dictionary.stream();
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k, key.as_bytes());
assert_eq!(streamer_v, v);
i += 1;
}
}
let &(ref key, ref _v) = &ids[2047];
stream_dictionary.get(key.as_bytes());
}
#[test]
fn test_stream_range() {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
for &(ref id, ref i) in &ids {
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
}
stream_dictionary_builder.finish().unwrap()
};
let source = ReadOnlySource::from(buffer);
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
{
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
let &(ref target_key, _) = &ids[i];
let mut streamer = stream_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k).unwrap(), key);
assert_eq!(streamer_v, v);
}
}
}
{
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
let &(ref target_key, _) = &ids[i];
let mut streamer = stream_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k, key.as_bytes());
assert_eq!(streamer_v, v);
}
}
}
{
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + 3];
let mut streamer = stream_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream();
for _ in 0..(j + 1) {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_some());
}
}
}
}
}

View File

@@ -79,3 +79,10 @@ impl Clone for ReadOnlySource {
self.slice(0, self.len())
}
}
impl From<Vec<u8>> for ReadOnlySource {
fn from(data: Vec<u8>) -> ReadOnlySource {
let shared_data = SharedVecSlice::from(data);
ReadOnlySource::Anonymous(shared_data)
}
}

View File

@@ -35,3 +35,9 @@ impl SharedVecSlice {
}
}
}
impl From<Vec<u8>> for SharedVecSlice {
fn from(data: Vec<u8>) -> SharedVecSlice {
SharedVecSlice::new(Arc::new(data))
}
}

View File

@@ -46,6 +46,9 @@ pub struct U64FastFieldReader {
max_value: u64,
}
unsafe impl Send for U64FastFieldReader {}
unsafe impl Sync for U64FastFieldReader {}
impl U64FastFieldReader {
/// Returns the minimum value for this fast field.
@@ -140,6 +143,9 @@ pub struct I64FastFieldReader {
underlying: U64FastFieldReader,
}
unsafe impl Send for I64FastFieldReader {}
unsafe impl Sync for I64FastFieldReader {}
impl I64FastFieldReader {
/// Returns the minimum value for this fast field.
///

View File

@@ -356,6 +356,11 @@ impl IndexWriter {
result
}
#[doc(hidden)]
pub fn new_segment(&self) -> Segment {
self.segment_updater.new_segment()
}
/// Spawns a new worker thread for indexing.
/// The thread consumes documents from the pipeline.
///
@@ -429,6 +434,12 @@ impl IndexWriter {
Ok(())
}
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
self.segment_updater.add_segment(self.generation, segment_entry);
}
/// Detects and removes the files that
/// are not used by the index anymore.
pub fn garbage_collect_files(&mut self) -> Result<()> {

View File

@@ -96,12 +96,12 @@ pub type Result<T> = std::result::Result<T, Error>;
mod core;
mod compression;
mod store;
pub mod store;
mod indexer;
mod common;
pub mod common;
mod error;
mod analyzer;
mod datastruct;
pub mod datastruct;
@@ -120,7 +120,7 @@ pub mod fastfield;
pub use directory::Directory;
pub use core::{Index, Segment, SegmentId, SegmentMeta, Searcher};
pub use core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, Searcher};
pub use indexer::IndexWriter;
pub use schema::{Term, Document};
pub use core::SegmentReader;

View File

@@ -25,8 +25,7 @@ pub use self::postings::Postings;
#[cfg(test)]
pub use self::vec_postings::VecPostings;
pub use self::segment_postings::SegmentPostings;
pub use self::segment_postings::{SegmentPostings, BlockSegmentPostings};
pub use self::intersection::IntersectionDocSet;
pub use self::freq_handler::FreqHandler;
pub use self::segment_postings_option::SegmentPostingsOption;
@@ -268,16 +267,26 @@ mod tests {
};
}
#[bench]
fn bench_block_segment_postings(b: &mut Bencher) {
let searcher = INDEX.searcher();
let segment_reader = searcher.segment_reader(0);
b.iter(|| {
let mut block_segment_postings = segment_reader.read_block_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
while block_segment_postings.advance() {}
});
}
#[bench]
fn bench_segment_postings(b: &mut Bencher) {
let searcher = INDEX.searcher();
let segment_reader = searcher.segment_reader(0);
b.iter(|| {
let mut segment_postings = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
while segment_postings.advance() {}
let mut block_segment_postings = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
while block_segment_postings.advance() {}
});
}
}
#[bench]
fn bench_segment_intersection(b: &mut Bencher) {

View File

@@ -7,6 +7,88 @@ use fastfield::DeleteBitSet;
const EMPTY_DATA: [u8; 0] = [0u8; 0];
pub struct BlockSegmentPostings<'a> {
num_binpacked_blocks: usize,
num_vint_docs: usize,
block_decoder: BlockDecoder,
freq_handler: FreqHandler,
remaining_data: &'a [u8],
doc_offset: DocId,
len: usize,
}
impl<'a> BlockSegmentPostings<'a> {
pub fn from_data(len: usize, data: &'a [u8], freq_handler: FreqHandler) -> BlockSegmentPostings<'a> {
let num_binpacked_blocks: usize = (len as usize) / NUM_DOCS_PER_BLOCK;
let num_vint_docs = (len as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks;
BlockSegmentPostings {
num_binpacked_blocks: num_binpacked_blocks,
num_vint_docs: num_vint_docs,
block_decoder: BlockDecoder::new(),
freq_handler: freq_handler,
remaining_data: data,
doc_offset: 0,
len: len,
}
}
pub fn reset(&mut self, len: usize, data: &'a [u8]) {
let num_binpacked_blocks: usize = (len as usize) / NUM_DOCS_PER_BLOCK;
let num_vint_docs = (len as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks;
self.num_binpacked_blocks = num_binpacked_blocks;
self.num_vint_docs = num_vint_docs;
self.remaining_data = data;
self.doc_offset = 0;
self.len = len;
}
pub fn docs(&self) -> &[DocId] {
self.block_decoder.output_array()
}
pub fn freq_handler(&self) -> &FreqHandler {
&self.freq_handler
}
pub fn advance(&mut self) -> bool {
if self.num_binpacked_blocks > 0 {
self.remaining_data = self.block_decoder.uncompress_block_sorted(self.remaining_data, self.doc_offset);
self.remaining_data = self.freq_handler.read_freq_block(self.remaining_data);
self.doc_offset = self.block_decoder.output(NUM_DOCS_PER_BLOCK - 1);
self.num_binpacked_blocks -= 1;
true
}
else {
if self.num_vint_docs > 0 {
self.remaining_data = self.block_decoder.uncompress_vint_sorted(self.remaining_data, self.doc_offset, self.num_vint_docs);
self.freq_handler.read_freq_vint(self.remaining_data, self.num_vint_docs);
self.num_vint_docs = 0;
true
}
else {
false
}
}
}
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings<'static> {
BlockSegmentPostings {
num_binpacked_blocks: 0,
num_vint_docs: 0,
block_decoder: BlockDecoder::new(),
freq_handler: FreqHandler::new_without_freq(),
remaining_data: &EMPTY_DATA,
doc_offset: 0,
len: 0,
}
}
}
/// `SegmentPostings` represents the inverted list or postings associated to
/// a term in a `Segment`.
///
@@ -14,28 +96,14 @@ const EMPTY_DATA: [u8; 0] = [0u8; 0];
/// Positions on the other hand, are optionally entirely decoded upfront.
pub struct SegmentPostings<'a> {
len: usize,
doc_offset: u32,
block_decoder: BlockDecoder,
freq_handler: FreqHandler,
remaining_data: &'a [u8],
cur: Wrapping<usize>,
block_cursor: BlockSegmentPostings<'a>,
cur_block_len: usize,
delete_bitset: DeleteBitSet,
}
impl<'a> SegmentPostings<'a> {
fn load_next_block(&mut self) {
let num_remaining_docs = self.len - self.cur.0;
if num_remaining_docs >= NUM_DOCS_PER_BLOCK {
self.remaining_data = self.block_decoder
.uncompress_block_sorted(self.remaining_data, self.doc_offset);
self.remaining_data = self.freq_handler.read_freq_block(self.remaining_data);
self.doc_offset = self.block_decoder.output(NUM_DOCS_PER_BLOCK - 1);
} else {
self.remaining_data = self.block_decoder
.uncompress_vint_sorted(self.remaining_data, self.doc_offset, num_remaining_docs);
self.freq_handler.read_freq_vint(self.remaining_data, num_remaining_docs);
}
}
/// Reads a Segment postings from an &[u8]
///
@@ -43,39 +111,29 @@ impl<'a> SegmentPostings<'a> {
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding
/// frequencies and/or positions
pub fn from_data(len: u32,
data: &'a [u8],
delete_bitset: &'a DeleteBitSet,
freq_handler: FreqHandler) -> SegmentPostings<'a> {
pub fn from_block_postings(
segment_block_postings: BlockSegmentPostings<'a>,
delete_bitset: DeleteBitSet) -> SegmentPostings<'a> {
SegmentPostings {
len: len as usize,
doc_offset: 0,
block_decoder: BlockDecoder::new(),
freq_handler: freq_handler,
remaining_data: data,
len: segment_block_postings.len,
block_cursor: segment_block_postings,
cur: Wrapping(usize::max_value()),
delete_bitset: delete_bitset.clone(),
cur_block_len: 0,
delete_bitset: delete_bitset,
}
}
/// Returns an empty segment postings object
pub fn empty() -> SegmentPostings<'static> {
let empty_block_cursor = BlockSegmentPostings::empty();
SegmentPostings {
len: 0,
doc_offset: 0,
block_decoder: BlockDecoder::new(),
freq_handler: FreqHandler::new_without_freq(),
remaining_data: &EMPTY_DATA,
block_cursor: empty_block_cursor,
delete_bitset: DeleteBitSet::empty(),
cur: Wrapping(usize::max_value()),
cur_block_len: 0,
}
}
/// Index within a block is used as an address when
/// interacting with the `FreqHandler`
fn index_within_block(&self) -> usize {
self.cur.0 % NUM_DOCS_PER_BLOCK
}
}
@@ -84,13 +142,16 @@ impl<'a> DocSet for SegmentPostings<'a> {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> bool {
loop {
loop {
self.cur += Wrapping(1);
if self.cur.0 >= self.len {
return false;
}
if self.index_within_block() == 0 {
self.load_next_block();
if self.cur.0 == self.cur_block_len {
self.cur = Wrapping(0);
if !self.block_cursor.advance() {
self.cur_block_len = 0;
self.cur = Wrapping(usize::max_value());
return false;
}
self.cur_block_len = self.block_cursor.docs().len();
}
if !self.delete_bitset.is_deleted(self.doc()) {
return true;
@@ -100,7 +161,7 @@ impl<'a> DocSet for SegmentPostings<'a> {
#[inline]
fn doc(&self) -> DocId {
self.block_decoder.output(self.index_within_block())
self.block_cursor.docs()[self.cur.0]
}
}
@@ -112,10 +173,10 @@ impl<'a> HasLen for SegmentPostings<'a> {
impl<'a> Postings for SegmentPostings<'a> {
fn term_freq(&self) -> u32 {
self.freq_handler.freq(self.index_within_block())
self.block_cursor.freq_handler().freq(self.cur.0)
}
fn positions(&self) -> &[u32] {
self.freq_handler.positions(self.index_within_block())
self.block_cursor.freq_handler().positions(self.cur.0)
}
}

View File

@@ -1,5 +1,5 @@
use Result;
use datastruct::FstMapBuilder;
use datastruct::TermDictionaryBuilder;
use super::TermInfo;
use schema::Field;
use schema::FieldEntry;
@@ -50,7 +50,7 @@ use common::BinarySerializable;
/// A description of the serialization format is
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>,
terms_fst_builder: TermDictionaryBuilder<WritePtr, TermInfo>,
postings_write: WritePtr,
positions_write: WritePtr,
written_bytes_postings: usize,
@@ -74,7 +74,7 @@ impl PostingsSerializer {
positions_write: WritePtr,
schema: Schema)
-> Result<PostingsSerializer> {
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
let terms_fst_builder = TermDictionaryBuilder::new(terms_write)?;
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: postings_write,

View File

@@ -30,10 +30,8 @@ impl<'a> Scorer for Box<Scorer + 'a> {
}
fn collect(&mut self, collector: &mut Collector) {
let scorer = self.deref_mut();
while scorer.advance() {
collector.collect(scorer.doc(), scorer.score());
}
let scorer: &mut Scorer = self.deref_mut();
scorer.collect(collector);
}
}