mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
36 Commits
columnread
...
tantivy-im
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d07e896a2f | ||
|
|
574feb8026 | ||
|
|
0bc047f8e1 | ||
|
|
2c2aa3d66c | ||
|
|
da99bbcb9d | ||
|
|
33f9426dd6 | ||
|
|
647c97fa3d | ||
|
|
8029aea548 | ||
|
|
3b33484cf8 | ||
|
|
2a909ddcc7 | ||
|
|
925b9063a7 | ||
|
|
5e1ce381fe | ||
|
|
67381e448f | ||
|
|
19d535c28e | ||
|
|
e00d6538fa | ||
|
|
8d7445f08a | ||
|
|
202e69b7e0 | ||
|
|
a650969509 | ||
|
|
c8d06d63b9 | ||
|
|
7eec9f038d | ||
|
|
57870fdcef | ||
|
|
c0f2055e32 | ||
|
|
9a8f06a523 | ||
|
|
bb57bee099 | ||
|
|
bc2a1f00e6 | ||
|
|
391f258ff3 | ||
|
|
673712a339 | ||
|
|
29ad1d84e5 | ||
|
|
62d9236200 | ||
|
|
f5f8e130b0 | ||
|
|
d5d9218093 | ||
|
|
a44f34c49d | ||
|
|
d8ea083177 | ||
|
|
d32dff1da9 | ||
|
|
f9ca0b16f1 | ||
|
|
a39fe90930 |
58
src/common/counting_writer.rs
Normal file
58
src/common/counting_writer.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use std::io::Write;
|
||||
use std::io;
|
||||
|
||||
|
||||
pub struct CountingWriter<W: Write> {
|
||||
underlying: W,
|
||||
written_bytes: usize,
|
||||
}
|
||||
|
||||
impl<W: Write> CountingWriter<W> {
|
||||
pub fn wrap(underlying: W) -> CountingWriter<W> {
|
||||
CountingWriter {
|
||||
underlying: underlying,
|
||||
written_bytes: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn written_bytes(&self,) -> usize {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> io::Result<(W, usize)> {
|
||||
self.flush()?;
|
||||
Ok((self.underlying, self.written_bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write> Write for CountingWriter<W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let written_size = self.underlying.write(buf)?;
|
||||
self.written_bytes += written_size;
|
||||
Ok(written_size)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.underlying.flush()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::CountingWriter;
|
||||
use std::io::Write;
|
||||
|
||||
#[test]
|
||||
fn test_counting_writer() {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
let mut counting_writer = CountingWriter::wrap(buffer);
|
||||
let bytes = (0u8..10u8).collect::<Vec<u8>>();
|
||||
counting_writer.write_all(&bytes).unwrap();
|
||||
let (w, len): (Vec<u8>, usize) = counting_writer.finish().unwrap();
|
||||
assert_eq!(len, 10);
|
||||
assert_eq!(w.len(), 10);
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,15 @@ mod serialize;
|
||||
mod timer;
|
||||
mod vint;
|
||||
pub mod bitpacker;
|
||||
mod counting_writer;
|
||||
|
||||
|
||||
pub use self::serialize::BinarySerializable;
|
||||
pub use self::timer::Timing;
|
||||
pub use self::timer::TimerTree;
|
||||
pub use self::timer::OpenTimer;
|
||||
pub use self::vint::VInt;
|
||||
pub use self::counting_writer::CountingWriter;
|
||||
use std::io;
|
||||
|
||||
/// Create a default io error given a string.
|
||||
@@ -76,4 +79,4 @@ mod test {
|
||||
test_i64_converter_helper(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,13 +7,14 @@ use common::HasLen;
|
||||
use core::SegmentMeta;
|
||||
use fastfield::{self, FastFieldNotAvailableError};
|
||||
use fastfield::DeleteBitSet;
|
||||
use postings::BlockSegmentPostings;
|
||||
use store::StoreReader;
|
||||
use schema::Document;
|
||||
use directory::ReadOnlySource;
|
||||
use DocId;
|
||||
use std::str;
|
||||
use postings::TermInfo;
|
||||
use datastruct::FstMap;
|
||||
use datastruct::TermDictionary;
|
||||
use std::sync::Arc;
|
||||
use std::fmt;
|
||||
use schema::Field;
|
||||
@@ -42,7 +43,7 @@ use schema::TextIndexingOptions;
|
||||
pub struct SegmentReader {
|
||||
segment_id: SegmentId,
|
||||
segment_meta: SegmentMeta,
|
||||
term_infos: Arc<FstMap<TermInfo>>,
|
||||
term_infos: Arc<TermDictionary<TermInfo>>,
|
||||
postings_data: ReadOnlySource,
|
||||
store_reader: StoreReader,
|
||||
fast_fields_reader: Arc<FastFieldsReader>,
|
||||
@@ -61,6 +62,11 @@ impl SegmentReader {
|
||||
self.segment_meta.max_doc()
|
||||
}
|
||||
|
||||
|
||||
pub fn schema(&self) -> &Schema {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
/// Returns the number of documents.
|
||||
/// Deleted documents are not counted.
|
||||
///
|
||||
@@ -133,11 +139,12 @@ impl SegmentReader {
|
||||
pub fn open(segment: Segment) -> Result<SegmentReader> {
|
||||
|
||||
let source = try!(segment.open_read(SegmentComponent::TERMS));
|
||||
let term_infos = try!(FstMap::from_source(source));
|
||||
let term_infos = try!(TermDictionary::from_source(source));
|
||||
let store_reader = StoreReader::from(try!(segment.open_read(SegmentComponent::STORE)));
|
||||
let postings_shared_mmap = try!(segment.open_read(SegmentComponent::POSTINGS));
|
||||
|
||||
let fast_field_data = try!(segment.open_read(SegmentComponent::FASTFIELDS));
|
||||
|
||||
let fast_fields_reader = try!(FastFieldsReader::open(fast_field_data));
|
||||
|
||||
let fieldnorms_data = try!(segment.open_read(SegmentComponent::FIELDNORMS));
|
||||
@@ -172,7 +179,7 @@ impl SegmentReader {
|
||||
}
|
||||
|
||||
/// Return the term dictionary datastructure.
|
||||
pub fn term_infos(&self) -> &FstMap<TermInfo> {
|
||||
pub fn term_infos(&self) -> &TermDictionary<TermInfo> {
|
||||
&self.term_infos
|
||||
}
|
||||
|
||||
@@ -185,16 +192,29 @@ impl SegmentReader {
|
||||
}
|
||||
|
||||
|
||||
/// Returns the segment postings associated with the term, and with the given option,
|
||||
/// or `None` if the term has never been encounterred and indexed.
|
||||
///
|
||||
/// If the field was not indexed with the indexing options that cover
|
||||
/// the requested options, the returned `SegmentPostings` the method does not fail
|
||||
/// and returns a `SegmentPostings` with as much information as possible.
|
||||
///
|
||||
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a `TextIndexingOptions`
|
||||
/// that does not index position will return a `SegmentPostings` with `DocId`s and frequencies.
|
||||
pub fn read_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<SegmentPostings> {
|
||||
pub fn postings_data(&self, offset: usize) -> &[u8] {
|
||||
&self.postings_data[offset..]
|
||||
}
|
||||
|
||||
pub fn get_block_postings(&self) -> BlockSegmentPostings {
|
||||
BlockSegmentPostings::from_data(0, &self.postings_data[..], FreqHandler::new_without_freq())
|
||||
}
|
||||
|
||||
pub fn read_block_postings_from_terminfo(&self, term_info: &TermInfo, field_type: &FieldType) -> Option<BlockSegmentPostings> {
|
||||
let offset = term_info.postings_offset as usize;
|
||||
let postings_data = &self.postings_data[offset..];
|
||||
let freq_handler = match *field_type {
|
||||
FieldType::Str(_) => {
|
||||
FreqHandler::new_without_freq()
|
||||
}
|
||||
_ => {
|
||||
FreqHandler::new_without_freq()
|
||||
}
|
||||
};
|
||||
Some(BlockSegmentPostings::from_data(term_info.doc_freq as usize, postings_data, freq_handler))
|
||||
}
|
||||
|
||||
pub fn read_block_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<BlockSegmentPostings> {
|
||||
let field = term.field();
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let term_info = get!(self.get_term_info(&term));
|
||||
@@ -234,7 +254,23 @@ impl SegmentReader {
|
||||
FreqHandler::new_without_freq()
|
||||
}
|
||||
};
|
||||
Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, &self.delete_bitset, freq_handler))
|
||||
Some(BlockSegmentPostings::from_data(term_info.doc_freq as usize, postings_data, freq_handler))
|
||||
}
|
||||
|
||||
/// Returns the segment postings associated with the term, and with the given option,
|
||||
/// or `None` if the term has never been encounterred and indexed.
|
||||
///
|
||||
/// If the field was not indexed with the indexing options that cover
|
||||
/// the requested options, the returned `SegmentPostings` the method does not fail
|
||||
/// and returns a `SegmentPostings` with as much information as possible.
|
||||
///
|
||||
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a `TextIndexingOptions`
|
||||
/// that does not index position will return a `SegmentPostings` with `DocId`s and frequencies.
|
||||
pub fn read_postings(&self, term: &Term, option: SegmentPostingsOption) -> Option<SegmentPostings> {
|
||||
self.read_block_postings(term, option)
|
||||
.map(|block_postings| {
|
||||
SegmentPostings::from_block_postings(block_postings, self.delete_bitset.clone())
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use fst::Streamer;
|
||||
use std::mem;
|
||||
use std::collections::BinaryHeap;
|
||||
use fst::map::Keys;
|
||||
use postings::TermInfo;
|
||||
use datastruct::TermDictionaryStreamer;
|
||||
use schema::Field;
|
||||
use schema::Term;
|
||||
use core::SegmentReader;
|
||||
@@ -34,7 +35,7 @@ impl Ord for HeapItem {
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub struct TermIterator<'a> {
|
||||
key_streams: Vec<Keys<'a>>,
|
||||
key_streams: Vec<TermDictionaryStreamer<'a, TermInfo>>,
|
||||
heap: BinaryHeap<HeapItem>,
|
||||
// Buffer hosting the list of segment ordinals containing
|
||||
// the current term.
|
||||
@@ -43,7 +44,7 @@ pub struct TermIterator<'a> {
|
||||
}
|
||||
|
||||
impl<'a> TermIterator<'a> {
|
||||
fn new(key_streams: Vec<Keys<'a>>) -> TermIterator<'a> {
|
||||
fn new(key_streams: Vec<TermDictionaryStreamer<'a, TermInfo>>) -> TermIterator<'a> {
|
||||
let key_streams_len = key_streams.len();
|
||||
TermIterator {
|
||||
key_streams: key_streams,
|
||||
@@ -98,7 +99,7 @@ impl<'a> TermIterator<'a> {
|
||||
|
||||
fn advance_segments(&mut self) {
|
||||
for segment_ord in self.current_segment_ords.drain(..) {
|
||||
if let Some(term) = self.key_streams[segment_ord].next() {
|
||||
if let Some((term, _val)) = self.key_streams[segment_ord].next() {
|
||||
self.heap.push(HeapItem {
|
||||
term: Term::from_bytes(term),
|
||||
segment_ord: segment_ord,
|
||||
@@ -126,7 +127,7 @@ impl<'a> From<&'a [SegmentReader]> for TermIterator<'a> {
|
||||
TermIterator::new(
|
||||
segment_readers
|
||||
.iter()
|
||||
.map(|reader| reader.term_infos().keys())
|
||||
.map(|reader| reader.term_infos().stream())
|
||||
.collect()
|
||||
)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
|
||||
fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
@@ -92,6 +93,10 @@ impl<V: BinarySerializable> FstMap<V> {
|
||||
self.fst_index.keys()
|
||||
}
|
||||
|
||||
pub fn fst_index(&self) -> &fst::Map {
|
||||
&self.fst_index
|
||||
}
|
||||
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<FstMap<V>> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 4;
|
||||
@@ -107,8 +112,8 @@ impl<V: BinarySerializable> FstMap<V> {
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn read_value(&self, offset: u64) -> V {
|
||||
|
||||
pub fn read_value(&self, offset: u64) -> V {
|
||||
let buffer = self.values_mmap.as_slice();
|
||||
let mut cursor = &buffer[(offset as usize)..];
|
||||
V::deserialize(&mut cursor).expect("Data in FST is corrupted")
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
mod fstmap;
|
||||
mod skip;
|
||||
pub mod stacker;
|
||||
mod stream_dictionary;
|
||||
|
||||
|
||||
//pub use self::fstmap::FstMapBuilder as TermDictionaryBuilder;
|
||||
//pub use self::fstmap::FstMap as TermDictionary;
|
||||
|
||||
|
||||
pub use self::stream_dictionary::StreamDictionaryBuilder as TermDictionaryBuilder;
|
||||
pub use self::stream_dictionary::StreamDictionary as TermDictionary;
|
||||
pub use self::stream_dictionary::StreamDictionaryStreamer as TermDictionaryStreamer;
|
||||
|
||||
pub use self::fstmap::FstMapBuilder;
|
||||
pub use self::fstmap::FstMap;
|
||||
pub use self::skip::{SkipListBuilder, SkipList};
|
||||
|
||||
@@ -91,6 +91,10 @@ impl Heap {
|
||||
pub fn get_mut_ref<Item>(&self, addr: u32) -> &mut Item {
|
||||
self.inner().get_mut_ref(addr)
|
||||
}
|
||||
|
||||
pub fn get_ref<Item>(&self, addr: u32) -> &Item {
|
||||
self.inner().get_mut_ref(addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
465
src/datastruct/stream_dictionary.rs
Normal file
465
src/datastruct/stream_dictionary.rs
Normal file
@@ -0,0 +1,465 @@
|
||||
#![allow(should_implement_trait)]
|
||||
|
||||
use std::cmp::max;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::io::Read;
|
||||
use fst;
|
||||
use fst::raw::Fst;
|
||||
use common::VInt;
|
||||
use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use common::CountingWriter;
|
||||
use std::cmp::Ordering;
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use std::str;
|
||||
use fst::raw::Node;
|
||||
use fst::raw::CompiledAddr;
|
||||
|
||||
const BLOCK_SIZE: usize = 1024;
|
||||
|
||||
fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryBuilder<W: Write, V: BinarySerializable + Clone + Default> {
|
||||
write: CountingWriter<W>,
|
||||
block_index: fst::MapBuilder<Vec<u8>>,
|
||||
last_key: Vec<u8>,
|
||||
len: usize,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn common_prefix_length(left: &[u8], right: &[u8]) -> usize {
|
||||
left.iter().cloned()
|
||||
.zip(right.iter().cloned())
|
||||
.take_while(|&(b1, b2)| b1 == b2)
|
||||
.count()
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
|
||||
loop {
|
||||
if let Some(transition) = node.transitions().last() {
|
||||
buffer.push(transition.inp);
|
||||
node = fst.node(transition.addr);
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn strictly_previous_key<B: AsRef<[u8]>>(fst_map: &fst::Map, key_as_ref: B) -> (Vec<u8>, u64) {
|
||||
let key = key_as_ref.as_ref();
|
||||
let fst = fst_map.as_fst();
|
||||
let mut node = fst.root();
|
||||
let mut node_stack: Vec<Node> = vec!(node.clone());
|
||||
|
||||
// first check the longest prefix.
|
||||
for &b in &key[..key.len() - 1] {
|
||||
node = match node.find_input(b) {
|
||||
None => {
|
||||
break;
|
||||
},
|
||||
Some(i) => {
|
||||
fst.node(node.transition_addr(i))
|
||||
},
|
||||
};
|
||||
node_stack.push(node);
|
||||
}
|
||||
|
||||
let len_node_stack = node_stack.len();
|
||||
for i in (1..len_node_stack).rev() {
|
||||
let cur_node = &node_stack[i];
|
||||
let b: u8 = key[i];
|
||||
let last_transition_opt = cur_node
|
||||
.transitions()
|
||||
.take_while(|transition| transition.inp < b)
|
||||
.last();
|
||||
|
||||
if let Some(last_transition) = last_transition_opt {
|
||||
let mut result_buffer = Vec::from(&key[..i]);
|
||||
result_buffer.push(last_transition.inp);
|
||||
let mut result = Vec::from(&key[..i]);
|
||||
result.push(last_transition.inp);
|
||||
let fork_node = fst.node(last_transition.addr);
|
||||
fill_last(fst, fork_node, &mut result);
|
||||
let val = fst_map.get(&result).unwrap();
|
||||
return (result, val);
|
||||
}
|
||||
else if cur_node.is_final() {
|
||||
// the previous key is a prefix
|
||||
let result_buffer = Vec::from(&key[..i]);
|
||||
let val = fst_map.get(&result_buffer).unwrap();
|
||||
return (result_buffer, val);
|
||||
}
|
||||
}
|
||||
|
||||
return (vec!(), 0);
|
||||
}
|
||||
|
||||
|
||||
impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<W, V> {
|
||||
|
||||
pub fn new(write: W) -> io::Result<StreamDictionaryBuilder<W, V>> {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
Ok(StreamDictionaryBuilder {
|
||||
write: CountingWriter::wrap(write),
|
||||
block_index: fst::MapBuilder::new(buffer)
|
||||
.expect("This cannot fail"),
|
||||
last_key: Vec::with_capacity(128),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn add_index_entry(&mut self) {
|
||||
self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap();
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)
|
||||
}
|
||||
|
||||
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{
|
||||
if self.len % BLOCK_SIZE == 0 {
|
||||
self.add_index_entry();
|
||||
}
|
||||
self.len += 1;
|
||||
let common_len = common_prefix_length(key, &self.last_key);
|
||||
VInt(common_len as u64).serialize(&mut self.write)?;
|
||||
self.last_key.truncate(common_len);
|
||||
self.last_key.extend_from_slice(&key[common_len..]);
|
||||
VInt((key.len() - common_len) as u64).serialize(&mut self.write)?;
|
||||
self.write.write_all(&key[common_len..])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_value(&mut self, value: &V) -> io::Result<()>{
|
||||
value.serialize(&mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> io::Result<W> {
|
||||
self.add_index_entry();
|
||||
let (mut w, split_len) = self.write.finish()?;
|
||||
let fst_write = self.block_index
|
||||
.into_inner()
|
||||
.map_err(convert_fst_error)?;
|
||||
w.write(&fst_write)?;
|
||||
(split_len as u64).serialize(&mut w)?;
|
||||
w.flush()?;
|
||||
Ok(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary<V>, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> {
|
||||
let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref());
|
||||
let offset: usize = offset as usize;
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &stream_dictionary.stream_data.as_slice()[offset..],
|
||||
current_key: Vec::from(prev_key),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct StreamDictionary<V> where V:BinarySerializable + Default + Clone {
|
||||
stream_data: ReadOnlySource,
|
||||
fst_index: fst::Map,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(match source {
|
||||
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
|
||||
ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
|
||||
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<StreamDictionary<V>> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 8;
|
||||
let split_len: usize = {
|
||||
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
|
||||
u64::deserialize(&mut split_len_buffer)? as usize
|
||||
};
|
||||
let stream_data = source.slice(0, split_len);
|
||||
let fst_data = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_data)?;
|
||||
|
||||
Ok(StreamDictionary {
|
||||
stream_data: stream_data,
|
||||
fst_index: fst_index,
|
||||
_phantom_: PhantomData
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
|
||||
let mut streamer = stream_before(self, target_key.as_ref());
|
||||
while let Some((iter_key, iter_val)) = streamer.next() {
|
||||
match iter_key.cmp(target_key.as_ref()) {
|
||||
Ordering::Less => {}
|
||||
Ordering::Equal => {
|
||||
let val: V = (*iter_val).clone();
|
||||
return Some(val);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
pub fn range(&self) -> StreamDictionaryStreamerBuilder<V> {
|
||||
let data: &[u8] = &self.stream_data;
|
||||
StreamDictionaryStreamerBuilder {
|
||||
stream_dictionary: &self,
|
||||
offset_from: 0,
|
||||
offset_to: (data.as_ptr() as usize) + data.len(),
|
||||
current_key: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream(&self) -> StreamDictionaryStreamer<V> {
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &*self.stream_data,
|
||||
current_key: Vec::with_capacity(128),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> {
|
||||
stream_dictionary: &'a StreamDictionary<V>,
|
||||
offset_from: usize,
|
||||
offset_to: usize,
|
||||
current_key: Vec<u8>,
|
||||
}
|
||||
|
||||
|
||||
/// Returns offset information for the first
|
||||
/// key in the stream matching a given predicate.
|
||||
///
|
||||
/// returns (start offset, the data required to load the value)
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer<V>) -> (usize, Vec<u8>)
|
||||
where V: 'a + BinarySerializable + Clone + Default {
|
||||
let mut prev: &[u8] = streamer.cursor;
|
||||
|
||||
let mut prev_data: Vec<u8> = streamer.current_key.clone();
|
||||
|
||||
while let Some((iter_key, _)) = streamer.next() {
|
||||
if !predicate(iter_key) {
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
prev = streamer.cursor;
|
||||
prev_data.clear();
|
||||
prev_data.extend_from_slice(iter_key);
|
||||
}
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
|
||||
impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(&self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> {
|
||||
let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..];
|
||||
let origin = data.as_ptr() as usize;
|
||||
let start = self.offset_from - origin;
|
||||
let stop = max(self.offset_to - origin, start);
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &data[start..stop],
|
||||
current_key: self.current_key,
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> {
|
||||
cursor: &'a [u8],
|
||||
current_key: Vec<u8>,
|
||||
current_value: V,
|
||||
}
|
||||
|
||||
impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> {
|
||||
|
||||
pub fn next(&mut self) -> Option<(&[u8], &V)> {
|
||||
if self.cursor.len() == 0 {
|
||||
return None;
|
||||
}
|
||||
let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
self.current_key.reserve(new_length);
|
||||
unsafe {
|
||||
self.current_key.set_len(new_length);
|
||||
}
|
||||
self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap();
|
||||
self.current_value = V::deserialize(&mut self.cursor).unwrap();
|
||||
Some((&self.current_key, &self.current_value))
|
||||
}
|
||||
|
||||
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use std::str;
|
||||
use directory::ReadOnlySource;
|
||||
use super::CountingWriter;
|
||||
use std::io::Write;
|
||||
use super::{BLOCK_SIZE, StreamDictionary, StreamDictionaryBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_stream_dictionary() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
|
||||
{
|
||||
let mut streamer = stream_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, v);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref _v) = &ids[2047];
|
||||
stream_dictionary.get(key.as_bytes());
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
|
||||
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k).unwrap(), key);
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + 3];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..(j + 1) {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -79,3 +79,10 @@ impl Clone for ReadOnlySource {
|
||||
self.slice(0, self.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for ReadOnlySource {
|
||||
fn from(data: Vec<u8>) -> ReadOnlySource {
|
||||
let shared_data = SharedVecSlice::from(data);
|
||||
ReadOnlySource::Anonymous(shared_data)
|
||||
}
|
||||
}
|
||||
@@ -35,3 +35,9 @@ impl SharedVecSlice {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for SharedVecSlice {
|
||||
fn from(data: Vec<u8>) -> SharedVecSlice {
|
||||
SharedVecSlice::new(Arc::new(data))
|
||||
}
|
||||
}
|
||||
@@ -46,6 +46,9 @@ pub struct U64FastFieldReader {
|
||||
max_value: u64,
|
||||
}
|
||||
|
||||
unsafe impl Send for U64FastFieldReader {}
|
||||
unsafe impl Sync for U64FastFieldReader {}
|
||||
|
||||
impl U64FastFieldReader {
|
||||
|
||||
/// Returns the minimum value for this fast field.
|
||||
@@ -140,6 +143,9 @@ pub struct I64FastFieldReader {
|
||||
underlying: U64FastFieldReader,
|
||||
}
|
||||
|
||||
unsafe impl Send for I64FastFieldReader {}
|
||||
unsafe impl Sync for I64FastFieldReader {}
|
||||
|
||||
impl I64FastFieldReader {
|
||||
/// Returns the minimum value for this fast field.
|
||||
///
|
||||
|
||||
@@ -356,6 +356,11 @@ impl IndexWriter {
|
||||
result
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn new_segment(&self) -> Segment {
|
||||
self.segment_updater.new_segment()
|
||||
}
|
||||
|
||||
/// Spawns a new worker thread for indexing.
|
||||
/// The thread consumes documents from the pipeline.
|
||||
///
|
||||
@@ -429,6 +434,12 @@ impl IndexWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
|
||||
let delete_cursor = self.delete_queue.cursor();
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
|
||||
self.segment_updater.add_segment(self.generation, segment_entry);
|
||||
}
|
||||
|
||||
/// Detects and removes the files that
|
||||
/// are not used by the index anymore.
|
||||
pub fn garbage_collect_files(&mut self) -> Result<()> {
|
||||
|
||||
@@ -96,12 +96,12 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
mod core;
|
||||
mod compression;
|
||||
mod store;
|
||||
pub mod store;
|
||||
mod indexer;
|
||||
mod common;
|
||||
pub mod common;
|
||||
mod error;
|
||||
mod analyzer;
|
||||
mod datastruct;
|
||||
pub mod datastruct;
|
||||
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ pub mod fastfield;
|
||||
|
||||
|
||||
pub use directory::Directory;
|
||||
pub use core::{Index, Segment, SegmentId, SegmentMeta, Searcher};
|
||||
pub use core::{Index, Segment, SegmentComponent, SegmentId, SegmentMeta, Searcher};
|
||||
pub use indexer::IndexWriter;
|
||||
pub use schema::{Term, Document};
|
||||
pub use core::SegmentReader;
|
||||
|
||||
@@ -25,8 +25,7 @@ pub use self::postings::Postings;
|
||||
|
||||
#[cfg(test)]
|
||||
pub use self::vec_postings::VecPostings;
|
||||
|
||||
pub use self::segment_postings::SegmentPostings;
|
||||
pub use self::segment_postings::{SegmentPostings, BlockSegmentPostings};
|
||||
pub use self::intersection::IntersectionDocSet;
|
||||
pub use self::freq_handler::FreqHandler;
|
||||
pub use self::segment_postings_option::SegmentPostingsOption;
|
||||
@@ -268,16 +267,26 @@ mod tests {
|
||||
};
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_block_segment_postings(b: &mut Bencher) {
|
||||
let searcher = INDEX.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
b.iter(|| {
|
||||
let mut block_segment_postings = segment_reader.read_block_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
|
||||
while block_segment_postings.advance() {}
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_segment_postings(b: &mut Bencher) {
|
||||
let searcher = INDEX.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
|
||||
b.iter(|| {
|
||||
let mut segment_postings = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
|
||||
while segment_postings.advance() {}
|
||||
let mut block_segment_postings = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
|
||||
while block_segment_postings.advance() {}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[bench]
|
||||
fn bench_segment_intersection(b: &mut Bencher) {
|
||||
|
||||
@@ -7,6 +7,88 @@ use fastfield::DeleteBitSet;
|
||||
|
||||
const EMPTY_DATA: [u8; 0] = [0u8; 0];
|
||||
|
||||
|
||||
pub struct BlockSegmentPostings<'a> {
|
||||
num_binpacked_blocks: usize,
|
||||
num_vint_docs: usize,
|
||||
block_decoder: BlockDecoder,
|
||||
freq_handler: FreqHandler,
|
||||
remaining_data: &'a [u8],
|
||||
doc_offset: DocId,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
impl<'a> BlockSegmentPostings<'a> {
|
||||
|
||||
pub fn from_data(len: usize, data: &'a [u8], freq_handler: FreqHandler) -> BlockSegmentPostings<'a> {
|
||||
let num_binpacked_blocks: usize = (len as usize) / NUM_DOCS_PER_BLOCK;
|
||||
let num_vint_docs = (len as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks;
|
||||
BlockSegmentPostings {
|
||||
num_binpacked_blocks: num_binpacked_blocks,
|
||||
num_vint_docs: num_vint_docs,
|
||||
block_decoder: BlockDecoder::new(),
|
||||
freq_handler: freq_handler,
|
||||
remaining_data: data,
|
||||
doc_offset: 0,
|
||||
len: len,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(&mut self, len: usize, data: &'a [u8]) {
|
||||
let num_binpacked_blocks: usize = (len as usize) / NUM_DOCS_PER_BLOCK;
|
||||
let num_vint_docs = (len as usize) - NUM_DOCS_PER_BLOCK * num_binpacked_blocks;
|
||||
self.num_binpacked_blocks = num_binpacked_blocks;
|
||||
self.num_vint_docs = num_vint_docs;
|
||||
self.remaining_data = data;
|
||||
self.doc_offset = 0;
|
||||
self.len = len;
|
||||
}
|
||||
|
||||
pub fn docs(&self) -> &[DocId] {
|
||||
self.block_decoder.output_array()
|
||||
}
|
||||
|
||||
pub fn freq_handler(&self) -> &FreqHandler {
|
||||
&self.freq_handler
|
||||
}
|
||||
|
||||
pub fn advance(&mut self) -> bool {
|
||||
if self.num_binpacked_blocks > 0 {
|
||||
self.remaining_data = self.block_decoder.uncompress_block_sorted(self.remaining_data, self.doc_offset);
|
||||
self.remaining_data = self.freq_handler.read_freq_block(self.remaining_data);
|
||||
self.doc_offset = self.block_decoder.output(NUM_DOCS_PER_BLOCK - 1);
|
||||
self.num_binpacked_blocks -= 1;
|
||||
true
|
||||
}
|
||||
else {
|
||||
if self.num_vint_docs > 0 {
|
||||
self.remaining_data = self.block_decoder.uncompress_vint_sorted(self.remaining_data, self.doc_offset, self.num_vint_docs);
|
||||
self.freq_handler.read_freq_vint(self.remaining_data, self.num_vint_docs);
|
||||
self.num_vint_docs = 0;
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an empty segment postings object
|
||||
pub fn empty() -> BlockSegmentPostings<'static> {
|
||||
BlockSegmentPostings {
|
||||
num_binpacked_blocks: 0,
|
||||
num_vint_docs: 0,
|
||||
block_decoder: BlockDecoder::new(),
|
||||
freq_handler: FreqHandler::new_without_freq(),
|
||||
remaining_data: &EMPTY_DATA,
|
||||
doc_offset: 0,
|
||||
len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/// `SegmentPostings` represents the inverted list or postings associated to
|
||||
/// a term in a `Segment`.
|
||||
///
|
||||
@@ -14,28 +96,14 @@ const EMPTY_DATA: [u8; 0] = [0u8; 0];
|
||||
/// Positions on the other hand, are optionally entirely decoded upfront.
|
||||
pub struct SegmentPostings<'a> {
|
||||
len: usize,
|
||||
doc_offset: u32,
|
||||
block_decoder: BlockDecoder,
|
||||
freq_handler: FreqHandler,
|
||||
remaining_data: &'a [u8],
|
||||
cur: Wrapping<usize>,
|
||||
block_cursor: BlockSegmentPostings<'a>,
|
||||
cur_block_len: usize,
|
||||
delete_bitset: DeleteBitSet,
|
||||
}
|
||||
|
||||
impl<'a> SegmentPostings<'a> {
|
||||
fn load_next_block(&mut self) {
|
||||
let num_remaining_docs = self.len - self.cur.0;
|
||||
if num_remaining_docs >= NUM_DOCS_PER_BLOCK {
|
||||
self.remaining_data = self.block_decoder
|
||||
.uncompress_block_sorted(self.remaining_data, self.doc_offset);
|
||||
self.remaining_data = self.freq_handler.read_freq_block(self.remaining_data);
|
||||
self.doc_offset = self.block_decoder.output(NUM_DOCS_PER_BLOCK - 1);
|
||||
} else {
|
||||
self.remaining_data = self.block_decoder
|
||||
.uncompress_vint_sorted(self.remaining_data, self.doc_offset, num_remaining_docs);
|
||||
self.freq_handler.read_freq_vint(self.remaining_data, num_remaining_docs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Reads a Segment postings from an &[u8]
|
||||
///
|
||||
@@ -43,39 +111,29 @@ impl<'a> SegmentPostings<'a> {
|
||||
/// * `data` - data array. The complete data is not necessarily used.
|
||||
/// * `freq_handler` - the freq handler is in charge of decoding
|
||||
/// frequencies and/or positions
|
||||
pub fn from_data(len: u32,
|
||||
data: &'a [u8],
|
||||
delete_bitset: &'a DeleteBitSet,
|
||||
freq_handler: FreqHandler) -> SegmentPostings<'a> {
|
||||
pub fn from_block_postings(
|
||||
segment_block_postings: BlockSegmentPostings<'a>,
|
||||
delete_bitset: DeleteBitSet) -> SegmentPostings<'a> {
|
||||
SegmentPostings {
|
||||
len: len as usize,
|
||||
doc_offset: 0,
|
||||
block_decoder: BlockDecoder::new(),
|
||||
freq_handler: freq_handler,
|
||||
remaining_data: data,
|
||||
len: segment_block_postings.len,
|
||||
block_cursor: segment_block_postings,
|
||||
cur: Wrapping(usize::max_value()),
|
||||
delete_bitset: delete_bitset.clone(),
|
||||
cur_block_len: 0,
|
||||
delete_bitset: delete_bitset,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Returns an empty segment postings object
|
||||
pub fn empty() -> SegmentPostings<'static> {
|
||||
let empty_block_cursor = BlockSegmentPostings::empty();
|
||||
SegmentPostings {
|
||||
len: 0,
|
||||
doc_offset: 0,
|
||||
block_decoder: BlockDecoder::new(),
|
||||
freq_handler: FreqHandler::new_without_freq(),
|
||||
remaining_data: &EMPTY_DATA,
|
||||
block_cursor: empty_block_cursor,
|
||||
delete_bitset: DeleteBitSet::empty(),
|
||||
cur: Wrapping(usize::max_value()),
|
||||
cur_block_len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Index within a block is used as an address when
|
||||
/// interacting with the `FreqHandler`
|
||||
fn index_within_block(&self) -> usize {
|
||||
self.cur.0 % NUM_DOCS_PER_BLOCK
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -84,13 +142,16 @@ impl<'a> DocSet for SegmentPostings<'a> {
|
||||
// next needs to be called a first time to point to the correct element.
|
||||
#[inline]
|
||||
fn advance(&mut self) -> bool {
|
||||
loop {
|
||||
loop {
|
||||
self.cur += Wrapping(1);
|
||||
if self.cur.0 >= self.len {
|
||||
return false;
|
||||
}
|
||||
if self.index_within_block() == 0 {
|
||||
self.load_next_block();
|
||||
if self.cur.0 == self.cur_block_len {
|
||||
self.cur = Wrapping(0);
|
||||
if !self.block_cursor.advance() {
|
||||
self.cur_block_len = 0;
|
||||
self.cur = Wrapping(usize::max_value());
|
||||
return false;
|
||||
}
|
||||
self.cur_block_len = self.block_cursor.docs().len();
|
||||
}
|
||||
if !self.delete_bitset.is_deleted(self.doc()) {
|
||||
return true;
|
||||
@@ -100,7 +161,7 @@ impl<'a> DocSet for SegmentPostings<'a> {
|
||||
|
||||
#[inline]
|
||||
fn doc(&self) -> DocId {
|
||||
self.block_decoder.output(self.index_within_block())
|
||||
self.block_cursor.docs()[self.cur.0]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,10 +173,10 @@ impl<'a> HasLen for SegmentPostings<'a> {
|
||||
|
||||
impl<'a> Postings for SegmentPostings<'a> {
|
||||
fn term_freq(&self) -> u32 {
|
||||
self.freq_handler.freq(self.index_within_block())
|
||||
self.block_cursor.freq_handler().freq(self.cur.0)
|
||||
}
|
||||
|
||||
fn positions(&self) -> &[u32] {
|
||||
self.freq_handler.positions(self.index_within_block())
|
||||
self.block_cursor.freq_handler().positions(self.cur.0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use Result;
|
||||
use datastruct::FstMapBuilder;
|
||||
use datastruct::TermDictionaryBuilder;
|
||||
use super::TermInfo;
|
||||
use schema::Field;
|
||||
use schema::FieldEntry;
|
||||
@@ -50,7 +50,7 @@ use common::BinarySerializable;
|
||||
/// A description of the serialization format is
|
||||
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
|
||||
pub struct PostingsSerializer {
|
||||
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>,
|
||||
terms_fst_builder: TermDictionaryBuilder<WritePtr, TermInfo>,
|
||||
postings_write: WritePtr,
|
||||
positions_write: WritePtr,
|
||||
written_bytes_postings: usize,
|
||||
@@ -74,7 +74,7 @@ impl PostingsSerializer {
|
||||
positions_write: WritePtr,
|
||||
schema: Schema)
|
||||
-> Result<PostingsSerializer> {
|
||||
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
|
||||
let terms_fst_builder = TermDictionaryBuilder::new(terms_write)?;
|
||||
Ok(PostingsSerializer {
|
||||
terms_fst_builder: terms_fst_builder,
|
||||
postings_write: postings_write,
|
||||
|
||||
@@ -30,10 +30,8 @@ impl<'a> Scorer for Box<Scorer + 'a> {
|
||||
}
|
||||
|
||||
fn collect(&mut self, collector: &mut Collector) {
|
||||
let scorer = self.deref_mut();
|
||||
while scorer.advance() {
|
||||
collector.collect(scorer.doc(), scorer.score());
|
||||
}
|
||||
let scorer: &mut Scorer = self.deref_mut();
|
||||
scorer.collect(collector);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user