Compare commits

..

21 Commits

Author SHA1 Message Date
Paul Masurel
4e93c681c0 Removed the SegmentCollector type from the Generics of the
FilterCollector
2020-11-25 14:13:22 +09:00
Paul Masurel
5a8546bb5a Revert "Move SegmentUpdater::list_files() to Index" 2020-11-25 14:01:26 +09:00
Adrien Guillo
f0c1867637 Move SegmentUpdater::list_files() to Index
... and make the method public
2020-11-25 14:01:26 +09:00
Paul Masurel
35fbf738c9 Refactoring of the skip index.
The skip index now identifies both the start and the end offset
of blocks. Checkpoints are compressed in blocks, reaching better
compression.
2020-11-25 14:01:26 +09:00
Adrien Guillo
6f919c61c7 Cache store reader blocks in an LRU fashion 2020-11-25 14:01:26 +09:00
Paul Masurel
86efdb778c Marked blockwand test as ignored.
- Using impl trait for iterating `matching_segments` in the termdict
merger
2020-11-25 14:01:26 +09:00
Paul Masurel
960c2ee39d Fixing unit test. 2020-11-25 14:01:26 +09:00
Paul Masurel
6653ed8eb6 Update src/core/searcher.rs
Co-authored-by: Adrien Guillo <adrien.guillo@gmail.com>
2020-11-25 14:01:26 +09:00
Paul Masurel
e20eedd98b Make field TermMerger API public 2020-11-25 14:01:26 +09:00
Paul Masurel
90c1fdefdc Closes #930 Minor bug.
Watch callback could be callback if the last watch handle was dropped
shortly before meta.json is called.
2020-11-25 14:01:26 +09:00
Paul Masurel
3e27d4c211 Making block wand test more robusts 2020-11-25 14:01:26 +09:00
Adrien Guillo
4dc268482f Modified Directory::exists API to return Result<bool, OpenReadError> 2020-11-25 14:01:26 +09:00
Paul Masurel
4f20dd410e Avoid loading fieldnorms when not necessary 2020-11-25 14:01:26 +09:00
Paul Masurel
b1125638f4 TermInfo contain the end_offset of the postings.
We slice the ReadOnlySource tightly.
2020-11-25 14:01:26 +09:00
Adrien Guillo
a4c95852e5 Updated CHANGELOG 2020-11-25 14:01:26 +09:00
Adrien Guillo
b28be75728 Implement FileWatcher 2020-11-25 14:01:26 +09:00
barrotsteindev
af7cb3ff0f simplified FilterCollector#for_segment 2020-11-23 11:09:03 +02:00
barrotsteindev
30d86c653a fmt 2020-11-22 10:40:45 +02:00
barrotsteindev
6f4c051700 updated docs 2020-11-21 21:26:44 +02:00
barrotsteindev
212e091553 removed unnecessary static liftimes 2020-11-21 21:24:27 +02:00
barrotsteindev
146058bdbf added initial implementation for filter_collector 2020-11-21 18:09:08 +02:00
66 changed files with 764 additions and 5061 deletions

View File

@@ -8,11 +8,6 @@ Tantivy 0.14.0
- Added helper for building intersections and unions in BooleanQuery (@guilload)
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
This version breaks compatibility and requires users to reindex everything.
Tantivy 0.13.2
===================

View File

@@ -53,11 +53,10 @@ lru = "0.6"
winapi = "0.3"
[dev-dependencies]
rand = "0.8"
rand = "0.7"
maplit = "1"
matches = "0.1.8"
proptest = "0.10"
criterion = "0.3"
[dev-dependencies.fail]
version = "0.4"
@@ -98,7 +97,3 @@ travis-ci = { repository = "tantivy-search/tantivy" }
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false

File diff suppressed because it is too large Load Diff

View File

@@ -1,22 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::tokenizer::TokenizerManager;
const ALICE_TXT: &'static str = include_str!("alice.txt");
pub fn criterion_benchmark(c: &mut Criterion) {
let tokenizer_manager = TokenizerManager::default();
let tokenizer = tokenizer_manager.get("default").unwrap();
c.bench_function("default-tokenize-alice", |b| {
b.iter(|| {
let mut word_count = 0;
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
while token_stream.advance() {
word_count += 1;
}
assert_eq!(word_count, 30_731);
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
let query_ords: HashSet<u64> = facets
.iter()
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
.filter_map(|key| facet_dict.term_ord(key.encoded_str()))
.collect();
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);

View File

@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0);
{
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if facet_streamer.advance() {
'outer: loop {
// at the begining of this loop, facet_streamer
@@ -368,12 +368,9 @@ impl SegmentCollector for FacetSegmentCollector {
}
let mut facet = vec![];
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
// TODO handle errors.
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
if let Ok(facet) = Facet::from_encoded(facet) {
facet_counts.insert(facet, count);
}
}
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
// TODO
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
}
FacetCounts { facet_counts }
}

View File

@@ -9,10 +9,8 @@
// ---
// Importing tantivy...
use std::marker::PhantomData;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::{FastFieldReader, FastValue};
use crate::fastfield::FastFieldReader;
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
@@ -43,104 +41,73 @@ use crate::{Score, SegmentReader, TantivyError};
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
/// let no_filter_collector = FilterCollector::new(price, &|value| true, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 1);
/// assert_eq!(top_docs.len(), 2);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
/// assert_eq!(top_docs[1].1, DocAddress(0, 3));
///
/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
/// let filter_all_collector = FilterCollector::new(price, &|value| false, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
where
TPredicate: 'static,
{
pub struct FilterCollector<TCollector> {
field: Field,
collector: TCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TCollector, TPredicate, TPredicateValue: FastValue>
FilterCollector<TCollector, TPredicate, TPredicateValue>
impl<TCollector> FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
TPredicate: Fn(TPredicateValue) -> bool + Send + Sync,
{
/// Create a new FilterCollector.
pub fn new(
field: Field,
predicate: &'static TPredicate,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
collector: TCollector,
) -> FilterCollector<TCollector, TPredicate, TPredicateValue> {
) -> FilterCollector<TCollector> {
FilterCollector {
field,
predicate,
collector,
t_predicate_value: PhantomData,
}
}
}
impl<TCollector, TPredicate, TPredicateValue: FastValue> Collector
for FilterCollector<TCollector, TPredicate, TPredicateValue>
impl<TCollector> Collector for FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>;
type Child = FilterSegmentCollector<TCollector::Child>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>> {
let schema = segment_reader.schema();
let field_entry = schema.get_field_entry(self.field);
if !field_entry.is_fast() {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is not a fast field.",
field_entry.name()
)));
}
let requested_type = TPredicateValue::to_type();
let field_schema_type = field_entry.field_type().value_type();
if requested_type != field_schema_type {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is of type {:?}!={:?}",
field_entry.name(),
requested_type,
field_schema_type
)));
}
) -> crate::Result<FilterSegmentCollector<TCollector::Child>> {
let fast_field_reader = segment_reader
.fast_fields()
.typed_fast_field_reader(self.field)
.u64(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"{:?} is not declared as a fast field in the schema.",
self.field
"Field {:?} is not a u64 fast field.",
field_name
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector,
segment_collector: segment_collector,
predicate: self.predicate,
t_predicate_value: PhantomData,
})
}
@@ -156,23 +123,15 @@ where
}
}
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
where
TPredicate: 'static,
TPredicateValue: 'static + FastValue,
{
fast_field_reader: FastFieldReader<TPredicateValue>,
pub struct FilterSegmentCollector<TSegmentCollector> {
fast_field_reader: FastFieldReader<u64>,
segment_collector: TSegmentCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TSegmentCollector, TPredicate, TPredicateValue> SegmentCollector
for FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
impl<TSegmentCollector> SegmentCollector for FilterSegmentCollector<TSegmentCollector>
where
TSegmentCollector: SegmentCollector,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
type Fruit = TSegmentCollector::Fruit;

View File

@@ -8,13 +8,6 @@ use crate::DocId;
use crate::Score;
use crate::SegmentLocalId;
use crate::collector::{FilterCollector, TopDocs};
use crate::query::QueryParser;
use crate::schema::{Schema, FAST, TEXT};
use crate::DateTime;
use crate::{doc, Index};
use std::str::FromStr;
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
compute_score: true,
};
@@ -23,54 +16,6 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
compute_score: true,
};
#[test]
pub fn test_filter_collector() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let price = schema_builder.add_u64_field("price", FAST);
let date = schema_builder.add_date_field("date", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
assert!(index_writer.commit().is_ok());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title]);
let query = query_parser.parse_query("diary").unwrap();
let filter_some_collector = FilterCollector::new(
price,
&|value: u64| value > 20_120u64,
TopDocs::with_limit(2),
);
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
assert_eq!(top_docs.len(), 1);
assert_eq!(top_docs[0].1, DocAddress(0, 1));
let filter_all_collector: FilterCollector<_, _, u64> =
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
assert_eq!(filtered_top_docs.len(), 0);
fn date_filter(value: DateTime) -> bool {
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
}
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
assert_eq!(filtered_date_docs.len(), 2);
}
/// Stores all of the doc ids.
/// This collector is only used for tests.
/// It is unusable in pr

View File

@@ -728,7 +728,7 @@ mod tests {
}
#[test]
fn test_top_collector_not_at_capacity_without_offset() {
fn test_top_collector_not_at_capacity() {
let index = make_index();
let field = index.schema().get_field("text").unwrap();
let query_parser = QueryParser::for_index(&index, vec![field]);

View File

@@ -20,10 +20,9 @@ impl<W: Write> CountingWriter<W> {
self.written_bytes
}
/// Returns the underlying write object.
/// Note that this method does not trigger any flushing.
pub fn finish(self) -> W {
self.underlying
pub fn finish(mut self) -> io::Result<(W, u64)> {
self.flush()?;
Ok((self.underlying, self.written_bytes))
}
}
@@ -47,6 +46,7 @@ impl<W: Write> Write for CountingWriter<W> {
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.flush()?;
self.underlying.terminate_ref(token)
}
}
@@ -63,9 +63,8 @@ mod test {
let mut counting_writer = CountingWriter::wrap(buffer);
let bytes = (0u8..10u8).collect::<Vec<u8>>();
counting_writer.write_all(&bytes).unwrap();
let len = counting_writer.written_bytes();
let buffer_restituted: Vec<u8> = counting_writer.finish();
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap();
assert_eq!(len, 10u64);
assert_eq!(buffer_restituted.len(), 10);
assert_eq!(w.len(), 10);
}
}

View File

@@ -115,16 +115,11 @@ pub fn u64_to_i64(val: u64) -> i64 {
/// For simplicity, tantivy internally handles `f64` as `u64`.
/// The mapping is defined by this function.
///
/// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
/// Maps `f64` to `u64` so that lexical order is preserved.
///
/// This is more suited than simply casting (`val as u64`)
/// which would truncate the result
///
/// # Reference
///
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
/// explains the mapping in a clear manner.
///
/// # See also
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
#[inline(always)]
@@ -153,7 +148,6 @@ pub(crate) mod test {
pub use super::minmax;
pub use super::serialize::test::fixed_size_test;
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use proptest::prelude::*;
use std::f64;
fn test_i64_converter_helper(val: i64) {
@@ -164,15 +158,6 @@ pub(crate) mod test {
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
}
proptest! {
#[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
let left_u64 = f64_to_u64(left);
let right_u64 = f64_to_u64(right);
assert_eq!(left_u64 < right_u64, left < right);
}
}
#[test]
fn test_i64_converter() {
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());

View File

@@ -511,28 +511,28 @@ mod tests {
}
#[test]
fn test_index_manual_policy_mmap() -> crate::Result<()> {
fn test_index_manual_policy_mmap() {
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let mut index = Index::create_from_tempdir(schema)?;
let mut writer = index.writer_for_tests()?;
writer.commit()?;
let mut index = Index::create_from_tempdir(schema).unwrap();
let mut writer = index.writer_for_tests().unwrap();
writer.commit().unwrap();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit()?;
writer.commit().unwrap();
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0);
reader.reload()?;
reader.reload().unwrap();
assert_eq!(reader.searcher().num_docs(), 1);
Ok(())
}
#[test]

View File

@@ -66,7 +66,7 @@ impl InvertedIndexReader {
}
/// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
self.termdict.get(term.value_bytes())
}
@@ -106,9 +106,10 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<BlockSegmentPostings>> {
self.get_term_info(term)?
Ok(self
.get_term_info(term)
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
.transpose()
.transpose()?)
}
/// Returns a block postings given a `term_info`.
@@ -180,7 +181,7 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term)?
self.get_term_info(term)
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose()
}
@@ -190,7 +191,7 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term)?
self.get_term_info(term)
.map(|term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose()
}
@@ -198,7 +199,7 @@ impl InvertedIndexReader {
/// Returns the number of documents containing the term.
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
Ok(self
.get_term_info(term)?
.get_term_info(term)
.map(|term_info| term_info.doc_freq)
.unwrap_or(0u32))
}

View File

@@ -12,7 +12,7 @@ pub use self::executor::Executor;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::Searcher;
pub use self::searcher::{FieldSearcher, Searcher};
pub use self::segment::Segment;
pub use self::segment::SerializableSegment;
pub use self::segment_component::SegmentComponent;

View File

@@ -1,16 +1,17 @@
use crate::collector::Collector;
use crate::core::Executor;
use crate::core::InvertedIndexReader;
use crate::core::SegmentReader;
use crate::query::Query;
use crate::schema::Document;
use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::{Field, Term};
use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader;
use crate::termdict::TermMerger;
use crate::DocAddress;
use crate::Index;
use std::sync::Arc;
use std::{fmt, io};
/// Holds a list of `SegmentReader`s ready for search.
@@ -147,6 +148,16 @@ impl Searcher {
collector.merge_fruits(fruits)
}
/// Return the field searcher associated to a `Field`.
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
.segment_readers
.iter()
.map(|segment_reader| segment_reader.inverted_index(field))
.collect::<crate::Result<Vec<_>>>()?;
Ok(FieldSearcher::new(inv_index_readers))
}
/// Summarize total space usage of this searcher.
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new();
@@ -157,6 +168,32 @@ impl Searcher {
}
}
/// **Experimental API** `FieldSearcher` only gives access to a stream over the terms of a field.
pub struct FieldSearcher {
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
}
impl FieldSearcher {
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
FieldSearcher { inv_index_readers }
}
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
///
/// This method does not take into account which documents are deleted, so
/// in presence of deletes some terms may not actually exist in any document
/// anymore.
pub fn terms(&self) -> TermMerger {
let term_streamers: Vec<_> = self
.inv_index_readers
.iter()
.map(|inverted_index| inverted_index.terms().stream())
.collect();
TermMerger::new(term_streamers)
}
}
impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self

View File

@@ -1,8 +1,8 @@
use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::WatchCallback;
use crate::directory::WatchHandle;
use crate::directory::{FileHandle, WatchCallback};
use crate::directory::{FileSlice, WritePtr};
use std::fmt;
use std::io;
@@ -108,13 +108,10 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a file and returns a boxed `FileHandle`.
/// Opens a virtual file for read.
///
/// Users of `Directory` should typically call `Directory::open_read(...)`,
/// while `Directory` implementor should implement `get_file_handle()`.
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
/// Once a virtual file is open, its data may not
/// change.
///
@@ -122,10 +119,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// have no effect on the returned `FileSlice` object.
///
/// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
let file_handle = self.get_file_handle(path)?;
Ok(FileSlice::new(file_handle))
}
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
/// Removes a file
///

View File

@@ -58,8 +58,7 @@ pub enum OpenWriteError {
}
impl OpenWriteError {
/// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath }
}
}
@@ -144,8 +143,7 @@ pub enum OpenReadError {
}
impl OpenReadError {
/// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath }
}
}

View File

@@ -2,11 +2,10 @@ use stable_deref_trait::StableDeref;
use crate::common::HasLen;
use crate::directory::OwnedBytes;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::{io, ops::Deref};
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
/// Objects that represents files sections in tantivy.
///
@@ -41,7 +40,7 @@ where
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
{
fn from(bytes: B) -> FileSlice {
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
FileSlice::new(OwnedBytes::new(bytes))
}
}
@@ -51,25 +50,22 @@ where
///
#[derive(Clone)]
pub struct FileSlice {
data: Arc<dyn FileHandle>,
data: Arc<Box<dyn FileHandle>>,
start: usize,
stop: usize,
}
impl FileSlice {
/// Wraps a FileHandle.
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
let num_bytes = file_handle.len();
FileSlice::new_with_num_bytes(file_handle, num_bytes)
}
/// Wraps a FileHandle.
#[doc(hidden)]
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
pub fn new<D>(data: D) -> Self
where
D: FileHandle,
{
let len = data.len();
FileSlice {
data: Arc::from(file_handle),
data: Arc::new(Box::new(data)),
start: 0,
stop: num_bytes,
stop: len,
}
}
@@ -150,12 +146,6 @@ impl FileSlice {
}
}
impl FileHandle for FileSlice {
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
self.read_bytes_slice(from, to)
}
}
impl HasLen for FileSlice {
fn len(&self) -> usize {
self.stop - self.start
@@ -170,7 +160,7 @@ mod tests {
#[test]
fn test_file_slice() -> io::Result<()> {
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
let file_slice = FileSlice::new(b"abcdef".as_ref());
assert_eq!(file_slice.len(), 6);
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
@@ -214,7 +204,7 @@ mod tests {
#[test]
fn test_slice_simple_read() -> io::Result<()> {
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice.len(), 6);
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
@@ -223,7 +213,7 @@ mod tests {
#[test]
fn test_slice_read_slice() -> io::Result<()> {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
Ok(())
}
@@ -231,14 +221,14 @@ mod tests {
#[test]
#[should_panic(expected = "assertion failed: from <= to")]
fn test_slice_read_slice_invalid_range() {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
}
#[test]
#[should_panic(expected = "`to` exceeds the fileslice length")]
fn test_slice_read_slice_invalid_range_exceeds() {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
b"bcd"

View File

@@ -3,7 +3,7 @@ use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
@@ -13,15 +13,15 @@ pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 }
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<Path>,
path: Arc<PathBuf>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &Path) -> FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
FileWatcher {
path: Arc::from(path),
path: Arc::new(path.clone()),
callbacks: Default::default(),
state: Default::default(),
}
@@ -63,7 +63,7 @@ impl FileWatcher {
handle
}
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {

View File

@@ -115,18 +115,6 @@ impl Footer {
}
Ok(())
}
VersionedFooter::V3 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
library_version: library_version.clone(),
index_version: self.version.clone(),
@@ -148,31 +136,24 @@ pub enum VersionedFooter {
crc32: CrcHashU32,
store_compression: String,
},
// Block wand max termfred on 1 byte
V3 {
crc32: CrcHashU32,
store_compression: String,
},
}
impl BinarySerializable for VersionedFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
match self {
VersionedFooter::V3 {
VersionedFooter::V2 {
crc32,
store_compression: compression,
} => {
// Serializes a valid `VersionedFooter` or panics if the version is unknown
// [ version | crc_hash | compression_mode ]
// [ 0..4 | 4..8 | variable ]
BinarySerializable::serialize(&3u32, &mut buf)?;
BinarySerializable::serialize(&2u32, &mut buf)?;
BinarySerializable::serialize(crc32, &mut buf)?;
BinarySerializable::serialize(compression, &mut buf)?;
}
VersionedFooter::V2 { .. }
| VersionedFooter::V1 { .. }
| VersionedFooter::UnknownVersion => {
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot serialize an unknown versioned footer ",
@@ -201,7 +182,7 @@ impl BinarySerializable for VersionedFooter {
reader.read_exact(&mut buf[..])?;
let mut cursor = &buf[..];
let version = u32::deserialize(&mut cursor)?;
if version > 3 {
if version != 1 && version != 2 {
return Ok(VersionedFooter::UnknownVersion);
}
let crc32 = u32::deserialize(&mut cursor)?;
@@ -211,14 +192,9 @@ impl BinarySerializable for VersionedFooter {
crc32,
store_compression,
}
} else if version == 2 {
VersionedFooter::V2 {
crc32,
store_compression,
}
} else {
assert_eq!(version, 3);
VersionedFooter::V3 {
assert_eq!(version, 2);
VersionedFooter::V2 {
crc32,
store_compression,
}
@@ -229,7 +205,6 @@ impl BinarySerializable for VersionedFooter {
impl VersionedFooter {
pub fn crc(&self) -> Option<CrcHashU32> {
match self {
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
VersionedFooter::UnknownVersion { .. } => None,
@@ -268,7 +243,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc32 = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V3 {
let footer = Footer::new(VersionedFooter::V2 {
crc32,
store_compression: crate::store::COMPRESSION.to_string(),
});
@@ -303,7 +278,7 @@ mod tests {
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
assert!(matches!(
footer.versioned_footer,
VersionedFooter::V3 { store_compression, .. }
VersionedFooter::V2 { store_compression, .. }
if store_compression == crate::store::COMPRESSION
));
assert_eq!(&footer.version, crate::version());
@@ -313,7 +288,7 @@ mod tests {
fn test_serialize_deserialize_footer() {
let mut buffer = Vec::new();
let crc32 = 123456u32;
let footer: Footer = Footer::new(VersionedFooter::V3 {
let footer: Footer = Footer::new(VersionedFooter::V2 {
crc32,
store_compression: "lz4".to_string(),
});
@@ -325,7 +300,7 @@ mod tests {
#[test]
fn footer_length() {
let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V3 {
let versioned_footer = VersionedFooter::V2 {
crc32,
store_compression: "lz4".to_string(),
};
@@ -346,7 +321,7 @@ mod tests {
// versionned footer length
12 | 128,
// index format version
3,
2,
0,
0,
0,
@@ -365,7 +340,7 @@ mod tests {
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
assert!(cursor.is_empty());
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 {
crc32: expected_crc,
store_compression: "lz4".to_string(),
};

View File

@@ -1,10 +1,10 @@
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::GarbageCollectionResult;
use crate::directory::Lock;
use crate::directory::META_LOCK;
use crate::directory::{DirectoryLock, FileHandle};
use crate::directory::{FileSlice, WritePtr};
use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
@@ -274,11 +274,6 @@ impl ManagedDirectory {
}
impl Directory for ManagedDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
let file_slice = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(file_slice)

View File

@@ -2,13 +2,14 @@ use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchHandle;
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
use crate::directory::{ArcBytes, WeakArcBytes};
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
@@ -24,6 +25,7 @@ use std::path::{Path, PathBuf};
use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir;
@@ -76,7 +78,7 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, WeakArcBytes>,
cache: HashMap<PathBuf, Weak<BoxedData>>,
}
impl Default for MmapCache {
@@ -110,7 +112,7 @@ impl MmapCache {
}
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> {
if let Some(mmap_weak) = self.cache.get(full_path) {
if let Some(mmap_arc) = mmap_weak.upgrade() {
self.counters.hit += 1;
@@ -121,7 +123,7 @@ impl MmapCache {
self.counters.miss += 1;
let mmap_opt = open_mmap(full_path)?;
Ok(mmap_opt.map(|mmap| {
let mmap_arc: ArcBytes = Arc::new(mmap);
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
let mmap_weak = Arc::downgrade(&mmap_arc);
self.cache.insert(full_path.to_owned(), mmap_weak);
mmap_arc
@@ -159,7 +161,7 @@ impl MmapDirectoryInner {
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path,
root_path: root_path,
}
}
@@ -314,7 +316,7 @@ impl TerminatingWrite for SafeFileWriter {
}
#[derive(Clone)]
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>);
impl Deref for MmapArc {
type Target = [u8];
@@ -344,7 +346,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
}
impl Directory for MmapDirectory {
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
@@ -357,16 +359,11 @@ impl Directory for MmapDirectory {
let io_err = make_io_err(msg);
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
})?;
let owned_bytes = mmap_cache
.get_mmap(&full_path)?
.map(|mmap_arc| {
let mmap_arc_obj = MmapArc(mmap_arc);
OwnedBytes::new(mmap_arc_obj)
})
.unwrap_or_else(OwnedBytes::empty);
Ok(Box::new(owned_bytes))
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
Ok(FileSlice::from(MmapArc(mmap_arc)))
} else {
Ok(FileSlice::empty())
}
}
/// Any entry associated to the path in the mmap will be
@@ -449,8 +446,7 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let full_path = self.resolve_path(path);
atomic_write(&full_path, content)?;
self.sync_directory()
atomic_write(&full_path, content)
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {

View File

@@ -23,7 +23,7 @@ pub mod error;
pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
pub(crate) use self::file_slice::BoxedData;
pub use self::file_slice::{FileHandle, FileSlice};
pub use self::owned_bytes::OwnedBytes;
pub use self::ram_directory::RAMDirectory;

View File

@@ -1,6 +1,5 @@
use crate::directory::FileHandle;
use stable_deref_trait::StableDeref;
use std::convert::TryInto;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;
@@ -96,24 +95,6 @@ impl OwnedBytes {
pub fn advance(&mut self, advance_len: usize) {
self.data = &self.data[advance_len..]
}
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
pub fn read_u8(&mut self) -> u8 {
assert!(!self.is_empty());
let byte = self.as_slice()[0];
self.advance(1);
byte
}
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
pub fn read_u64(&mut self) -> u64 {
assert!(self.len() > 7);
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
self.advance(8);
u64::from_le_bytes(octlet)
}
}
impl fmt::Debug for OwnedBytes {
@@ -249,22 +230,6 @@ mod tests {
Ok(())
}
#[test]
fn test_owned_bytes_read_u8() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
assert_eq!(bytes.read_u8(), 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test]
fn test_owned_bytes_read_u64() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
assert_eq!(bytes.read_u64(), u64::MAX - 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test]
fn test_owned_bytes_split() {
let bytes = OwnedBytes::new(b"abcdefghi".as_ref());

View File

@@ -12,8 +12,6 @@ use std::path::{Path, PathBuf};
use std::result;
use std::sync::{Arc, RwLock};
use super::FileHandle;
/// Writer associated with the `RAMDirectory`
///
/// The Writer just writes a buffer.
@@ -44,12 +42,12 @@ impl VecWriter {
impl Drop for VecWriter {
fn drop(&mut self) {
// if !self.is_flushed {
// panic!(
// "You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
// self.path
// )
// }
if !self.is_flushed {
panic!(
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop.",
self.path
)
}
}
}
@@ -165,11 +163,6 @@ impl RAMDirectory {
}
impl Directory for RAMDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}

View File

@@ -6,12 +6,12 @@ use std::sync::Weak;
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(op))
WatchCallback(Arc::new(Box::new(op)))
}
fn call(&self) {

View File

@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
pub const TERMINATED: DocId = std::i32::MAX as u32;
/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
pub trait DocSet {
/// Goes to the next element.
///
/// The DocId of the next element is returned.
@@ -129,14 +129,6 @@ impl<'a> DocSet for &'a mut dyn DocSet {
fn size_hint(&self) -> u32 {
(**self).size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
(**self).count(delete_bitset)
}
fn count_including_deleted(&mut self) -> u32 {
(**self).count_including_deleted()
}
}
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {

View File

@@ -1,5 +1,4 @@
use super::MultiValueIntFastFieldReader;
use crate::error::DataCorruption;
use crate::schema::Facet;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
@@ -63,13 +62,12 @@ impl FacetReader {
&mut self,
facet_ord: TermOrdinal,
output: &mut Facet,
) -> crate::Result<()> {
) -> Result<(), str::Utf8Error> {
let found_term = self
.term_dict
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
.ord_to_term(facet_ord as u64, &mut self.buffer);
assert!(found_term, "Term ordinal {} no found.", facet_ord);
let facet_str = str::from_utf8(&self.buffer[..])
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
let facet_str = str::from_utf8(&self.buffer[..])?;
output.set_facet_str(facet_str);
Ok(())
}

View File

@@ -51,15 +51,6 @@ impl<Item: FastValue> FastFieldReader<Item> {
}
}
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
FastFieldReader {
bit_unpacker: self.bit_unpacker,
min_value_u64: self.min_value_u64,
max_value_u64: self.max_value_u64,
_phantom: PhantomData,
}
}
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.

View File

@@ -1,6 +1,6 @@
use crate::common::CompositeFile;
use crate::fastfield::BytesFastFieldReader;
use crate::fastfield::MultiValueIntFastFieldReader;
use crate::fastfield::{BytesFastFieldReader, FastValue};
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage;
@@ -201,14 +201,6 @@ impl FastFieldReaders {
None
}
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
&self,
field: Field,
) -> Option<FastFieldReader<TFastValue>> {
self.u64_lenient(field)
.map(|fast_field_reader| fast_field_reader.cast())
}
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns `None`.

View File

@@ -61,38 +61,16 @@ impl FieldNormReaders {
/// precompute computationally expensive functions of the fieldnorm
/// in a very short array.
#[derive(Clone)]
pub struct FieldNormReader(ReaderImplEnum);
impl From<ReaderImplEnum> for FieldNormReader {
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
FieldNormReader(reader_enum)
}
}
#[derive(Clone)]
enum ReaderImplEnum {
FromData(OwnedBytes),
Const {
num_docs: u32,
fieldnorm_id: u8,
fieldnorm: u32,
},
pub struct FieldNormReader {
data: OwnedBytes,
}
impl FieldNormReader {
/// Creates a `FieldNormReader` with a constant fieldnorm.
///
/// The fieldnorm will be subjected to compression as if it was coming
/// from an array-backed fieldnorm reader.
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
ReaderImplEnum::Const {
num_docs,
fieldnorm_id,
fieldnorm,
}
.into()
let field_norms_data = OwnedBytes::new(vec![fieldnorm_id; num_docs as usize]);
FieldNormReader::new(field_norms_data)
}
/// Opens a field norm reader given its file.
@@ -102,15 +80,12 @@ impl FieldNormReader {
}
fn new(data: OwnedBytes) -> Self {
ReaderImplEnum::FromData(data).into()
FieldNormReader { data }
}
/// Returns the number of documents in this segment.
pub fn num_docs(&self) -> u32 {
match &self.0 {
ReaderImplEnum::FromData(data) => data.len() as u32,
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
}
self.data.len() as u32
}
/// Returns the `fieldnorm` associated to a doc id.
@@ -123,25 +98,14 @@ impl FieldNormReader {
/// The fieldnorm is effectively decoded from the
/// `fieldnorm_id` by doing a simple table lookup.
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
id_to_fieldnorm(fieldnorm_id)
}
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
}
let fieldnorm_id = self.fieldnorm_id(doc_id);
id_to_fieldnorm(fieldnorm_id)
}
/// Returns the `fieldnorm_id` associated to a document.
#[inline(always)]
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
fieldnorm_id
}
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
}
self.data.as_slice()[doc_id as usize]
}
/// Converts a `fieldnorm_id` into a fieldnorm.
@@ -165,7 +129,9 @@ impl FieldNormReader {
.map(FieldNormReader::fieldnorm_to_id)
.collect::<Vec<u8>>();
let field_norms_data = OwnedBytes::new(field_norms_id);
FieldNormReader::new(field_norms_data)
FieldNormReader {
data: field_norms_data,
}
}
}
@@ -184,20 +150,4 @@ mod tests {
assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
}
#[test]
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
}
#[test]
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
}
}

View File

@@ -1,93 +1,45 @@
use crate::Index;
use crate::Searcher;
use crate::{doc, schema::*};
use rand::thread_rng;
use rand::Rng;
use std::collections::HashSet;
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
use crate::schema::*;
use crate::Index;
use crate::Searcher;
use rand::Rng;
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_store() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64;
for iteration in 0..500 {
dbg!(iteration);
let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
for _ in 0..num_docs {
doc_set.push(doc_id);
index_writer.add_document(doc!(id_field=>doc_id));
doc_id += 1;
}
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
check_index_content(&searcher, &doc_set)?;
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_indexing() -> crate::Result<()> {
fn test_indexing() {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let index = Index::create_from_tempdir(schema).unwrap();
let reader = index.reader().unwrap();
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..200 {
let random_val = rng.gen_range(0..20);
let random_val = rng.gen_range(0, 20);
if random_val == 0 {
index_writer.commit()?;
index_writer.commit().expect("Commit failed");
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
reader.reload()?;
reader.reload().unwrap();
let searcher = reader.searcher();
// check that everything is correct.
check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
check_index_content(&searcher, &committed_docs);
} else {
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val);
@@ -103,5 +55,4 @@ fn test_functional_indexing() -> crate::Result<()> {
}
}
}
Ok(())
}

View File

@@ -53,7 +53,7 @@ impl DeleteQueue {
return block;
}
let block = Arc::new(Block {
operations: Arc::new([]),
operations: Arc::default(),
next: NextBlock::from(self.clone()),
});
wlock.last_block = Arc::downgrade(&block);
@@ -108,7 +108,7 @@ impl DeleteQueue {
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
let new_block = Arc::new(Block {
operations: Arc::from(delete_operations.into_boxed_slice()),
operations: Arc::new(delete_operations.into_boxed_slice()),
next: NextBlock::from(self.clone()),
});
@@ -167,7 +167,7 @@ impl NextBlock {
}
struct Block {
operations: Arc<[DeleteOperation]>,
operations: Arc<Box<[DeleteOperation]>>,
next: NextBlock,
}

View File

@@ -449,7 +449,7 @@ impl IndexWriter {
}
/// Accessor to the merge policy.
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
self.segment_updater.get_merge_policy()
}

View File

@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
/// `LogMergePolicy` tries to merge segments that have a similar number of
/// `LogMergePolicy` tries tries to merge segments that have a similar number of
/// documents.
#[derive(Debug, Clone)]
pub struct LogMergePolicy {

View File

@@ -503,6 +503,7 @@ impl IndexMerger {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let mut field_term_streams = Vec::new();
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
let field_readers: Vec<Arc<InvertedIndexReader>> = self
@@ -511,10 +512,9 @@ impl IndexMerger {
.map(|reader| reader.inverted_index(indexed_field))
.collect::<crate::Result<Vec<_>>>()?;
let mut field_term_streams = Vec::new();
for field_reader in &field_readers {
let terms = field_reader.terms();
field_term_streams.push(terms.stream()?);
field_term_streams.push(terms.stream());
max_term_ords.push(terms.num_terms() as u64);
}

View File

@@ -9,15 +9,6 @@ pub struct DeleteOperation {
pub term: Term,
}
impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
}
}
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {

View File

@@ -25,10 +25,9 @@ use futures::future::Future;
use futures::future::TryFutureExt;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io::{self, Write};
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
@@ -155,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<dyn MergePolicy>>,
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
@@ -194,19 +193,19 @@ impl SegmentUpdater {
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
})))
}
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
self.merge_policy.read().unwrap().clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
let arc_merge_policy = Arc::from(merge_policy);
let arc_merge_policy = Arc::new(merge_policy);
*self.merge_policy.write().unwrap() = arc_merge_policy;
}
@@ -410,13 +409,6 @@ impl SegmentUpdater {
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
if let crate::TantivyError::IOError(ref io_err) = &e {
if io_err.kind() == io::ErrorKind::InvalidData {
println!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
error!(" SEGMENTS THAT CAUSE THE BUG {:?}", merge_operation.segment_ids());
process::exit(1);
}
}
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
@@ -431,9 +423,7 @@ impl SegmentUpdater {
});
Ok(merging_future_recv
.unwrap_or_else(|e| {
Err(crate::TantivyError::SystemError("Merge failed".to_string()))
}))
.unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string()))))
}
async fn consider_merge_options(&self) {

View File

@@ -160,7 +160,7 @@ pub use self::docset::{DocSet, TERMINATED};
pub use crate::common::HasLen;
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::{Executor, SegmentComponent};
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{FieldSearcher, Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
pub use crate::indexer::operation::UserOperation;
@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// Index format version.
const INDEX_FORMAT_VERSION: u32 = 3;
const INDEX_FORMAT_VERSION: u32 = 2;
/// Structure version for the index.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -8,7 +8,7 @@ use std::io::{self, Write};
pub struct PositionSerializer<W: io::Write> {
bit_packer: BitPacker4x,
write_stream: CountingWriter<W>,
write_skip_index: W,
write_skiplist: W,
block: Vec<u32>,
buffer: Vec<u8>,
num_ints: u64,
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
}
impl<W: io::Write> PositionSerializer<W> {
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
PositionSerializer {
bit_packer: BitPacker4x::new(),
write_stream: CountingWriter::wrap(write_stream),
write_skip_index,
write_skiplist,
block: Vec::with_capacity(128),
buffer: vec![0u8; 128 * 4],
num_ints: 0u64,
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
fn flush_block(&mut self) -> io::Result<()> {
let num_bits = self.bit_packer.num_bits(&self.block[..]);
self.write_skip_index.write_all(&[num_bits])?;
self.write_skiplist.write_all(&[num_bits])?;
let written_len = self
.bit_packer
.compress(&self.block[..], &mut self.buffer, num_bits);
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
self.flush_block()?;
}
for &long_skip in &self.long_skips {
long_skip.serialize(&mut self.write_skip_index)?;
long_skip.serialize(&mut self.write_skiplist)?;
}
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
self.write_skip_index.flush()?;
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
self.write_skiplist.flush()?;
self.write_stream.flush()?;
Ok(())
}

View File

@@ -469,7 +469,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
.unwrap()
@@ -513,7 +513,7 @@ mod tests {
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term)?.unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
}
@@ -521,7 +521,7 @@ mod tests {
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term)?.unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
}
assert_eq!(block_segments.docs(), &[1, 3, 5]);

View File

@@ -54,7 +54,7 @@ pub mod tests {
use crate::DocId;
use crate::HasLen;
use crate::Score;
use std::{iter, mem};
use std::iter;
#[test]
pub fn test_position_write() -> crate::Result<()> {
@@ -71,7 +71,6 @@ pub mod tests {
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
}
field_serializer.close_term()?;
mem::drop(field_serializer);
posting_serializer.close()?;
let read = segment.open_read(SegmentComponent::POSITIONS)?;
assert!(read.len() <= 140);
@@ -180,7 +179,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes, b"hello");
}
{
@@ -192,7 +191,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes[..], ok_token_text.as_bytes());
}
Ok(())

View File

@@ -1,46 +1,32 @@
use std::convert::TryInto;
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable};
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::BM25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
#[inline(always)]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline(always)]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline(always)]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline(always)]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
prev_doc: DocId,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer { buffer: Vec::new() }
SkipSerializer {
buffer: Vec::new(),
prev_doc: 0u32,
}
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
write_u32(last_doc, &mut self.buffer);
assert!(
last_doc > self.prev_doc,
"write_doc(...) called with non-increasing doc ids. \
Did you forget to call clear maybe?"
);
let delta_doc = last_doc - self.prev_doc;
self.prev_doc = last_doc;
delta_doc.serialize(&mut self.buffer).unwrap();
self.buffer.push(doc_num_bits);
}
@@ -49,13 +35,16 @@ impl SkipSerializer {
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
write_u32(tf_sum, &mut self.buffer);
tf_sum
.serialize(&mut self.buffer)
.expect("Should never fail");
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
self.buffer.push(fieldnorm_id);
let mut buf = [0u8; 8];
let bytes = serialize_vint_u32(term_freq, &mut buf);
self.buffer.extend_from_slice(bytes);
}
pub fn data(&self) -> &[u8] {
@@ -63,6 +52,7 @@ impl SkipSerializer {
}
pub fn clear(&mut self) {
self.prev_doc = 0u32;
self.buffer.clear();
}
}
@@ -169,13 +159,18 @@ impl SkipReader {
}
fn read_block_info(&mut self) {
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let doc_num_bits = bytes[4];
let doc_delta = {
let bytes = self.owned_read.as_slice();
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[..4]);
u32::from_le_bytes(buf)
};
self.last_doc_in_block += doc_delta as DocId;
let doc_num_bits = self.owned_read.as_slice()[4];
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.owned_read.advance(5);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits: 0,
@@ -185,10 +180,11 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqs => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]);
self.owned_read.advance(7 + num_bytes);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -198,11 +194,16 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqsAndPositions => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let tf_sum = {
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[6..10]);
u32::from_le_bytes(buf)
};
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]);
self.owned_read.advance(11 + num_bytes);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -212,7 +213,6 @@ impl SkipReader {
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
@@ -274,24 +274,6 @@ mod tests {
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {

View File

@@ -7,7 +7,6 @@ use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::TantivyError;
use crate::{DocId, Score};
use std::io;
use std::sync::Arc;
use tantivy_fst::Automaton;
@@ -20,7 +19,6 @@ pub struct AutomatonWeight<A> {
impl<A> AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
/// Create a new AutomationWeight
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
@@ -30,10 +28,7 @@ where
}
}
fn automaton_stream<'a>(
&'a self,
term_dict: &'a TermDictionary,
) -> io::Result<TermStreamer<'a, &'a A>> {
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
let automaton: &A = &*self.automaton;
let term_stream_builder = term_dict.search(automaton);
term_stream_builder.into_stream()
@@ -43,14 +38,13 @@ where
impl<A> Weight for AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let max_doc = reader.max_doc();
let mut doc_bitset = BitSet::with_max_value(max_doc);
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict)?;
let mut term_stream = self.automaton_stream(term_dict);
while term_stream.advance() {
let term_info = term_stream.value();
let mut block_segment_postings = inverted_index
@@ -104,7 +98,6 @@ mod tests {
index
}
#[derive(Clone, Copy)]
enum State {
Start,
NotMatching,

View File

@@ -106,7 +106,7 @@ impl BM25Weight {
BM25Weight::new(idf_explain, avg_fieldnorm)
}
pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
let weight = idf_explain.value() * (1.0 + K1);
BM25Weight {
idf_explain,

View File

@@ -11,7 +11,6 @@ use crate::schema::{Field, IndexRecordOption, Term};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::{DocId, Score};
use std::collections::Bound;
use std::io;
use std::ops::Range;
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
@@ -275,7 +274,7 @@ pub struct RangeWeight {
}
impl RangeWeight {
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
use std::collections::Bound::*;
let mut term_stream_builder = term_dict.range();
term_stream_builder = match self.left_bound {
@@ -299,7 +298,7 @@ impl Weight for RangeWeight {
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_range = self.term_range(term_dict)?;
let mut term_range = self.term_range(term_dict);
while term_range.advance() {
let term_info = term_range.value();
let mut block_segment_postings = inverted_index

View File

@@ -12,7 +12,7 @@ use std::marker::PhantomData;
/// This is useful for queries like `+somethingrequired somethingoptional`.
///
/// Note that `somethingoptional` has no impact on the `DocSet`.
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> {
req_scorer: TReqScorer,
opt_scorer: TOptScorer,
score_cache: Option<Score>,
@@ -23,7 +23,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
where
TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{
/// Creates a new `RequiredOptionalScorer`.
pub fn new(
@@ -44,7 +43,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
where
TReqScorer: DocSet,
TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{
fn advance(&mut self) -> DocId {
self.score_cache = None;

View File

@@ -3,7 +3,7 @@ use crate::Score;
/// The `ScoreCombiner` trait defines how to compute
/// an overall score given a list of scores.
pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
pub trait ScoreCombiner: Default + Clone + Copy + 'static {
/// Aggregates the score combiner with the given scorer.
///
/// The `ScoreCombiner` may decide to call `.scorer.score()`

View File

@@ -1,7 +1,7 @@
use super::term_weight::TermWeight;
use crate::query::bm25::BM25Weight;
use crate::query::Query;
use crate::query::Weight;
use crate::query::{Explanation, Query};
use crate::schema::IndexRecordOption;
use crate::Searcher;
use crate::Term;
@@ -100,13 +100,7 @@ impl TermQuery {
field_entry.name()
)));
}
let bm25_weight;
if scoring_enabled {
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
} else {
bm25_weight =
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
}
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
let index_record_option = if scoring_enabled {
self.index_record_option
} else {

View File

@@ -302,7 +302,7 @@ mod tests {
let mut rng = rand::thread_rng();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 {
let term_freq = rng.gen_range(1..10000);
let term_freq = rng.gen_range(1, 10000);
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
let text = words.join(" ");
writer.add_document(doc!(text_field=>text));

View File

@@ -45,7 +45,7 @@ impl Weight for TermWeight {
} else {
let field = self.term.field();
let inv_index = reader.inverted_index(field)?;
let term_info = inv_index.get_term_info(&self.term)?;
let term_info = inv_index.get_term_info(&self.term);
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
}
}

View File

@@ -233,7 +233,6 @@ mod tests {
assert_eq!(Facet::root(), Facet::from("/"));
assert_eq!(format!("{}", Facet::root()), "/");
assert!(Facet::root().is_root());
assert_eq!(Facet::root().encoded_str(), "");
}
#[test]

View File

@@ -1,5 +1,5 @@
use crate::schema::Value;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::collections::BTreeMap;
/// Internal representation of a document used for JSON
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
/// A `NamedFieldDocument` is a simple representation of a document
/// as a `BTreeMap<String, Vec<Value>>`.
///
#[derive(Debug, Deserialize, Serialize)]
#[derive(Serialize)]
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);

View File

@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &str = "lz4";
pub const COMPRESSION: &'static str = "lz4";
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();

View File

@@ -43,9 +43,6 @@ impl CheckpointBlock {
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}

View File

@@ -26,12 +26,6 @@ pub struct Checkpoint {
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) && (self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@@ -45,16 +39,13 @@ impl fmt::Debug for Checkpoint {
#[cfg(test)]
mod tests {
use std::{io, iter};
use std::io;
use futures::executor::block_on;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::indexer::NoMergePolicy;
use crate::schema::{SchemaBuilder, STORED, STRING};
use crate::store::index::Checkpoint;
use crate::{DocAddress, DocId, Index, Term};
use crate::DocId;
use super::{SkipIndex, SkipIndexBuilder};
@@ -63,7 +54,7 @@ mod tests {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
@@ -81,7 +72,7 @@ mod tests {
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
@@ -95,7 +86,7 @@ mod tests {
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 0,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
@@ -130,7 +121,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
@@ -142,40 +133,6 @@ mod tests {
(doc as u64) * (doc as u64)
}
#[test]
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let text = schema_builder.add_text_field("text", STORED | STRING);
let body = schema_builder.add_text_field("body", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
.take(1_000)
.collect();
for _ in 0..20 {
index_writer.add_document(doc!(body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.add_document(doc!(text=>"testb"));
for _ in 0..10 {
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.delete_term(Term::from_field_text(text, "testb"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 30);
for i in 0..searcher.num_docs() as u32 {
let _doc = searcher.doc(DocAddress(0u32, i))?;
}
Ok(())
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
@@ -193,28 +150,26 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(vals: Vec<u64>) -> Vec<u64> {
let mut output = Vec::with_capacity(vals.len() + 1);
output.push(0u64);
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals {
let new_val = val + prev;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
output.push(new_val);
*val = new_val;
}
output
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(0..max_len)
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
@@ -266,7 +221,7 @@ mod tests {
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);

View File

@@ -19,7 +19,7 @@ impl<'a> Iterator for LayerCursor<'a> {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if block_mut.deserialize(remaining_mut).is_err() {
if let Err(_) = block_mut.deserialize(remaining_mut) {
return None;
}
self.cursor = 0;
@@ -50,7 +50,8 @@ impl Layer {
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.find(|checkpoint| checkpoint.end_doc > target)
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
}
@@ -59,46 +60,6 @@ pub struct SkipIndex {
}
impl SkipIndex {
pub fn open(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
let layer = Layer {
data: data.slice(start_offset as usize, end_offset as usize),
};
layers.push(layer);
start_offset = end_offset;
}
SkipIndex { layers }
}
pub fn is_valid(&self) -> bool {
let checkpoints: Vec<Checkpoint> = self.checkpoints().collect();
let mut prev_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 0u32,
start_offset: 0u64,
end_offset: 0u64,
};
for checkpoint in checkpoints {
if !checkpoint.follows(&prev_checkpoint) {
return false;
}
prev_checkpoint = checkpoint;
}
true
}
pub(crate) fn from_bytes(data: &[u8]) -> SkipIndex {
let data = OwnedBytes::new(data.to_owned());
SkipIndex::open(data)
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
@@ -130,3 +91,22 @@ impl SkipIndex {
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -1,6 +1,6 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, SkipIndex, CHECKPOINT_PERIOD};
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
@@ -28,20 +28,18 @@ impl LayerBuilder {
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
self.block.doc_interval().map(|(start_doc, end_doc)| {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Some(Checkpoint {
Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
})
} else {
None
}
}
})
}
fn push(&mut self, checkpoint: Checkpoint) {
@@ -50,7 +48,7 @@ impl LayerBuilder {
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
if emit_skip_info {
self.flush_block()
} else {
@@ -87,8 +85,7 @@ impl SkipIndexBuilder {
}
}
pub fn write<W: Write>(mut self, real_output: &mut W) -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
@@ -109,14 +106,10 @@ impl SkipIndexBuilder {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(&mut output)?;
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
if !SkipIndex::from_bytes(&output).is_valid() {
return Err(io::Error::new(io::ErrorKind::InvalidData, "about to write invalid skip index"));
}
real_output.write_all(&output)?;
Ok(())
}
}

View File

@@ -35,7 +35,7 @@ impl StoreReader {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
let skip_index = SkipIndex::from(index_data);
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),

View File

@@ -72,7 +72,6 @@ impl StoreWriter {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
@@ -87,17 +86,12 @@ impl StoreWriter {
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.register_checkpoint(checkpoint);
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
@@ -106,13 +100,14 @@ impl StoreWriter {
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
self.offset_index_writer.insert(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}

View File

@@ -1,27 +0,0 @@
/*!
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
about the term.
Internally, the term dictionary relies on the `fst` crate to store
a sorted mapping that associate each term to its rank in the lexicographical order.
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
lexicographical order matches the natural order of integers.
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
and then treated as a `u64`.
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
mod streamer;
mod term_info_store;
mod termdict;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};

View File

@@ -20,37 +20,438 @@ as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
use tantivy_fst::automaton::AlwaysMatch;
mod fst_termdict;
use fst_termdict as termdict;
mod merger;
#[cfg(test)]
mod tests;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
pub type TermDictionary = self::termdict::TermDictionary;
mod merger;
mod streamer;
mod term_info_store;
mod termdict;
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
pub use self::merger::TermMerger;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub type TermMerger<'a> = self::merger::TermMerger<'a>;
#[cfg(test)]
mod tests {
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::core::Index;
use crate::directory::{Directory, FileSlice, RAMDirectory};
use crate::postings::TermInfo;
use crate::schema::{Schema, TEXT};
use std::path::PathBuf;
use std::str;
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream();
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_iterator() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a b d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a b c d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field => "e f"));
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let field_searcher = searcher.field(text_field)?;
let mut term_it = field_searcher.terms();
let mut term_string = String::new();
while term_it.advance() {
//let term = Term::from_bytes(term_it.key());
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
}
assert_eq!(&*term_string, "abcdef");
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream();
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes()),
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream();
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream();
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream();
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_stream_range_boundaries() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
if backwards {
res.reverse();
}
res
};
{
let range = term_dictionary.range().backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream();
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream();
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}
}

View File

@@ -1,5 +1,3 @@
use std::io;
use super::TermDictionary;
use crate::postings::TermInfo;
use crate::termdict::TermOrdinal;
@@ -61,14 +59,14 @@ where
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
Ok(TermStreamer {
pub fn into_stream(self) -> TermStreamer<'a, A> {
TermStreamer {
fst_map: self.fst_map,
stream: self.stream_builder.into_stream(),
term_ord: 0u64,
current_key: Vec::with_capacity(100),
current_value: TermInfo::default(),
})
}
}
}

View File

@@ -80,6 +80,7 @@ where
.serialize(&mut counting_writer)?;
let footer_size = counting_writer.written_bytes();
(footer_size as u64).serialize(&mut counting_writer)?;
counting_writer.flush()?;
}
Ok(file)
}
@@ -138,8 +139,8 @@ impl TermDictionary {
}
/// Returns the ordinal associated to a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
Ok(self.fst_index.get(key))
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
self.fst_index.get(key)
}
/// Returns the term associated to a given term ordinal.
@@ -151,7 +152,7 @@ impl TermDictionary {
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
bytes.clear();
let fst = self.fst_index.as_fst();
let mut node = fst.root();
@@ -166,10 +167,10 @@ impl TermDictionary {
let new_node_addr = transition.addr;
node = fst.node(new_node_addr);
} else {
return Ok(false);
return false;
}
}
Ok(true)
true
}
/// Returns the number of terms in the dictionary.
@@ -178,10 +179,9 @@ impl TermDictionary {
}
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
Ok(self
.term_ord(key)?
.map(|term_ord| self.term_info_from_ord(term_ord)))
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
self.term_ord(key)
.map(|term_ord| self.term_info_from_ord(term_ord))
}
/// Returns a range builder, to stream all of the terms
@@ -191,7 +191,7 @@ impl TermDictionary {
}
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
pub fn stream(&self) -> TermStreamer<'_> {
self.range().into_stream()
}

View File

@@ -1,431 +0,0 @@
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
use crate::postings::TermInfo;
use std::path::PathBuf;
use std::str;
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().unwrap().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream()?;
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream()?;
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes())?,
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream()?;
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream()?;
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream()?;
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
TermDictionary::open(file)
}
#[test]
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
res
};
{
let range = term_dictionary.range().ge([2u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream()?;
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
}
{
let range = term_dictionary.range().le([6u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list_backward = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
res.reverse();
res
};
{
let range = term_dictionary.range().backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32]
);
}
Ok(())
}
#[test]
fn test_ord_to_term() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut bytes = vec![];
for b in 0u8..10u8 {
termdict.ord_to_term(b as u64, &mut bytes)?;
assert_eq!(&bytes, &[b]);
}
Ok(())
}
#[test]
fn test_stream_term_ord() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut stream = termdict.stream()?;
for b in 0u8..10u8 {
assert!(stream.advance(), true);
assert_eq!(stream.term_ord(), b as u64);
assert_eq!(stream.key(), &[b]);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream()?;
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}