Compare commits

...

90 Commits

Author SHA1 Message Date
Paul Masurel
08f7706973 test store 2021-01-09 10:27:03 +09:00
Paul Masurel
bf6e6e8a7c Merge pull request #972 from tantivy-search/issue/969
Issue/969
2021-01-07 22:49:31 +09:00
Paul Masurel
203b0256a3 Minor renaming 2021-01-07 22:47:57 +09:00
Paul Masurel
caf2a38b7e Closes #969.
The segment stacking optimization is not updating "first_doc_in_block".
2021-01-07 22:43:56 +09:00
Paul Masurel
96f24b078e Added failing unit test. 2021-01-07 22:43:28 +09:00
Paul Masurel
332b50a4eb Merge pull request #970 from tantivy-search/functional-test-store
Added a functional long running test to test store merging.
2021-01-07 14:27:08 +09:00
Paul Masurel
8ca0954b3b Added a functional long running test to test store merging. 2021-01-07 14:07:15 +09:00
Paul Masurel
36343e2de8 Merge pull request #968 from tantivy-search/add-bench-analyzer
added a simple bench for the default analyzer
2021-01-06 21:33:39 +09:00
Paul Masurel
2f14a892ca added a simple bench for the default analyzer 2021-01-06 19:11:26 +09:00
Paul Masurel
9c3cabce40 Updated version of the rand crate. 2021-01-06 18:09:00 +09:00
Paul Masurel
f8d71c2b10 Merge pull request #964 from mosuka/deserializable
Make NamedFieldDocument deserializable
2021-01-06 17:43:53 +09:00
Paul Masurel
394dfb24f1 Merge pull request #965 from lewisdiamond/patch-1
Fix spelling
2021-01-06 13:38:31 +09:00
Lewis Diamond
b0549a229d Fix spelling 2021-01-05 22:34:56 -05:00
Minoru Osuka
670b6eaff6 Make NamedFieldDocument deserializable 2020-12-21 16:51:31 +09:00
Paul Masurel
a4f33d3823 Added comment to f64 conversion to u64.
- Added proptest
- Added comment to Lemire blog post.
2020-12-15 13:40:31 +09:00
Paul Masurel
c7841e3da5 Merge pull request #953 from barrotsteindev/filter-collector-tpredicatevalue
Generic filter collector
2020-12-14 10:35:46 +09:00
barrotsteindev
e7b4a12bba cargo fmt 2020-12-10 14:10:55 +02:00
barrotsteindev
0aaa929d6e Merge branch 'main' into filter-collector-tpredicatevalue 2020-12-10 11:27:19 +02:00
barrotsteindev
1112797c18 added a line to CHANGELOG.md 2020-12-10 11:25:08 +02:00
barrotsteindev
920481e1c1 change unit test 2020-12-10 11:24:53 +02:00
Paul Masurel
55f7b84966 Merge pull request #952 from tantivy-search/bm25-on-onebyte
Encode blockwand on a single byte.
2020-12-10 18:09:31 +09:00
Paul Masurel
09ab4df1fe Encode blockwand on a single byte. 2020-12-10 18:08:52 +09:00
barrotsteindev
0c2cf81b37 cargo fmt 2020-12-10 11:08:35 +02:00
barrotsteindev
d864430bda final edits 2020-12-10 11:08:15 +02:00
Paul Masurel
de60540e06 fixing compilation 2020-12-10 10:36:21 +02:00
Paul Masurel
c3e311e6b8 Removed 'static in compression_lz4. 2020-12-09 15:30:52 +09:00
barrotsteindev
ac704f2f22 WIP generic filter collector 2020-12-08 14:36:52 +02:00
Paul Masurel
be626083a0 Reorganized and added termdict unit tests. 2020-12-07 12:50:36 +09:00
Paul Masurel
b68fcca1e0 Minor changes
- Open{Write,Read}Error::wrap_io_error made public
- Arc<PathBuf> -> Arc<Path> in file_watcher.
2020-12-03 23:31:50 +09:00
Paul Masurel
af6dfa1856 Small refactoring 2020-12-03 14:27:05 +09:00
Paul Masurel
654c400a0b TermDictionary.finish does not flush 2020-12-03 13:36:25 +09:00
Paul Masurel
80a99539ce Several TermDict operation now returns an io::Result 2020-12-03 13:13:11 +09:00
Paul Masurel
4b1c770e5e Simplified counting writer and removed flush 2020-12-03 11:24:39 +09:00
Paul Masurel
3491645e69 Moved the term merger 2020-12-03 10:24:04 +09:00
Paul Masurel
e72c8287f8 Merge pull request #951 from tantivy-search/fst-isolated
Fst isolated
2020-12-03 10:11:39 +09:00
Paul Masurel
b4b3bc7acd Cargo fmt 2020-12-03 10:08:38 +09:00
Paul Masurel
521c7b271b Isolated fst impl of termdictionary in a specific module. 2020-12-02 21:18:33 +09:00
Paul Masurel
acd888c999 Merge pull request #950 from tantivy-search/guilload--fix-clippy-warning
Fix clippy warning
2020-12-02 08:09:31 +09:00
Adrien Guillo
3ab1ba0b2f Fix clippy warning 2020-12-01 12:07:53 -08:00
Paul Masurel
b344c0ac05 Merge pull request #949 from tantivy-search/docset_is_send
DocSet is send
2020-12-01 19:12:51 +09:00
Paul Masurel
1741619c7f DocSet is send 2020-12-01 19:11:21 +09:00
Paul Masurel
067ba3dff0 Merge pull request #946 from tantivy-search/issue/test-bugfix-atomicwrite
Attempt to fix bug surfacing sometimes in test.
2020-12-01 15:29:51 +09:00
Paul Masurel
f79250f665 Fix perf regression in the benchmark for the Count collector.
In order to reduce IO, we introduced a way to instanciate a dummy
constant FieldnormReader which worked by allocating a buffer with
as many bytes as there are docs in the segments.

This allocation is not a negligible by any mean.

This PR works by offering two implementation for the
FieldnormReader.
The const field norm reader simply returns the same value all of the
time, while the array based one does the same as the current one.
2020-12-01 08:51:32 +09:00
Paul Masurel
5a33b8d533 Merge pull request #942 from barrotsteindev/filter-collector
added initial implementation for filter_collector
2020-11-30 11:26:28 +09:00
Paul Masurel
d165655fb1 Added specialized implementation for count/count_including... in &mut DocSet 2020-11-30 11:24:13 +09:00
barrotsteindev
c805871b92 better test 2020-11-25 14:25:49 +02:00
barrotsteindev
f288e32634 rebaes on origin/main 2020-11-25 14:08:43 +02:00
barrotsteindev
bc44543d8f added TPredicate generic param and updated tests 2020-11-25 14:08:24 +02:00
Paul Masurel
db514208a7 Removed the SegmentCollector type from the Generics of the
FilterCollector
2020-11-25 14:08:24 +02:00
barrotsteindev
b6ff29e020 simplified FilterCollector#for_segment 2020-11-25 14:08:24 +02:00
barrotsteindev
7c94dfdc15 fmt 2020-11-25 14:08:24 +02:00
barrotsteindev
8782c0eada updated docs 2020-11-25 14:08:24 +02:00
barrotsteindev
fea0ba1042 removed unnecessary static liftimes 2020-11-25 14:08:24 +02:00
barrotsteindev
027555c75f added initial implementation for filter_collector 2020-11-25 14:08:24 +02:00
Paul Masurel
b478ed747a Attempt to fix bug surfacing sometimes in test.
Recently, `test_index_manual_policy_mmap` has been failing on Windows.

The idea addressed by this patch is that we forget to sync the parent
directory with the current implementation of atomic writes.
This was done correctly when we were relying the atomicwrites crate.

*crossing fingers*
2020-11-25 18:00:05 +09:00
Paul Masurel
e9aa27dace Avoid computing the BM25 weight if scoring is disabled 2020-11-25 14:35:49 +09:00
Paul Masurel
c079133f3a Merge pull request #945 from tantivy-search/guilload--replace-arc-box-with-arc
Replace some `Arc<Box<dyn...` with `Arc<dyn...`
2020-11-25 13:57:22 +09:00
Paul Masurel
30c5f7c5f0 Applied CR comments 2020-11-25 13:56:05 +09:00
Adrien Guillo
6f26871c0f Replace some Arc<Box<dyn... with Arc<dyn... 2020-11-24 19:54:53 -08:00
Paul Masurel
f93cc5b5e3 Merge pull request #944 from tantivy-search/no-file-len-problem
No filelen problem.
2020-11-25 11:54:44 +09:00
Paul Masurel
5a25c8dfd3 No filelen problem. 2020-11-25 11:51:58 +09:00
Paul Masurel
f5c079159d Merge pull request #943 from tantivy-search/guilload--ownedbytes-helper-methods
Add helper methods for reading u8 and u64 to `OwnedBytes`
2020-11-25 09:04:40 +09:00
Adrien Guillo
1cfdce3437 Add helper methods for reading u8 and u64 to OwnedBytes 2020-11-23 10:45:46 -08:00
Paul Masurel
e9e6d141e9 Merge pull request #941 from tantivy-search/revert-940-guilload--move-list-files-to-index
Revert "Move `SegmentUpdater::list_files()` to `Index`"
2020-11-20 13:54:05 +09:00
Paul Masurel
8d0e049261 Revert "Move SegmentUpdater::list_files() to Index" 2020-11-20 13:53:50 +09:00
Paul Masurel
0335c7353d Merge pull request #940 from tantivy-search/guilload--move-list-files-to-index
Move `SegmentUpdater::list_files()` to `Index`
2020-11-18 11:08:20 +09:00
Adrien Guillo
267e920a80 Move SegmentUpdater::list_files() to Index
... and make the method public
2020-11-17 17:54:18 -08:00
Paul Masurel
d8a3a47e3e Refactoring of the skip index.
Merge pull request #927 from tantivy-search/compact-store-index
    
The skip index now identifies both the start and the end offset of blocks. Checkpoints are compressed in blocks, reaching better compression.
2020-11-17 16:13:45 +09:00
Paul Masurel
7f0e61b173 Refactoring of the skip index.
The skip index now identifies both the start and the end offset
of blocks. Checkpoints are compressed in blocks, reaching better
compression.
2020-11-17 16:05:11 +09:00
Paul Masurel
ce4c50446b Merge pull request #937 from tantivy-search/guilload--cache-store-reader-blocks
Cache store reader blocks in an LRU fashion
2020-11-17 13:45:10 +09:00
Adrien Guillo
9ab25d2575 Cache store reader blocks in an LRU fashion 2020-11-16 19:09:10 -08:00
Paul Masurel
6d4b982417 Marked blockwand test as ignored.
- Using impl trait for iterating `matching_segments` in the termdict
merger
2020-11-16 13:44:14 +09:00
Paul Masurel
650eca271f Merge pull request #932 from tantivy-search/fix-unit-test-file-watcher
Fixing unit test.
2020-11-13 11:47:15 +09:00
Paul Masurel
8ee55aef6d Fixing unit test. 2020-11-13 09:01:45 +09:00
Paul Masurel
40d41c7dcb Merge pull request #929 from tantivy-search/api-public-term-merger
Make field TermMerger API public
2020-11-12 14:11:53 +09:00
Paul Masurel
c780a889a7 Merge pull request #931 from tantivy-search/issue/930
Closes #930 Minor bug.
2020-11-12 13:22:34 +09:00
Paul Masurel
eef348004e Closes #930 Minor bug.
Watch callback could be callback if the last watch handle was dropped
shortly before meta.json is called.
2020-11-11 15:51:23 +09:00
Paul Masurel
e784bbc40f Update src/core/searcher.rs
Co-authored-by: Adrien Guillo <adrien.guillo@gmail.com>
2020-11-11 12:37:52 +09:00
Paul Masurel
b8118d439f Make field TermMerger API public 2020-11-11 11:59:09 +09:00
Paul Masurel
a49e59053c Making block wand test more robusts 2020-11-10 18:01:38 +09:00
Paul Masurel
41bb2bd58b Merge pull request #926 from tantivy-search/guilload--directory-exists
Modified `Directory::exists` API to return `Result<bool, OpenReadError>`
2020-11-10 17:59:45 +09:00
Adrien Guillo
7fd6054145 Modified Directory::exists API to return Result<bool, OpenReadError> 2020-11-09 18:00:14 -08:00
Paul Masurel
6abf4e97b5 Merge pull request #925 from tantivy-search/postings-end-offset
Adding post stop offset to TermInfo.
2020-11-09 15:58:04 +09:00
Paul Masurel
d23aee76c9 Avoid loading fieldnorms when not necessary 2020-11-09 15:50:16 +09:00
Adrien Guillo
58a1595792 Updated CHANGELOG 2020-11-06 11:13:44 -08:00
Paul Masurel
726d32eac5 Merge pull request #924 from tantivy-search/guilload--implement-poll-watcher
Implement FileWatcher
2020-11-06 22:41:26 +09:00
Paul Masurel
b5f3dcdc8b TermInfo contain the end_offset of the postings.
We slice the ReadOnlySource tightly.
2020-11-06 15:18:51 +09:00
Adrien Guillo
2875deb4b1 Implement FileWatcher 2020-11-05 20:08:15 -08:00
Paul Masurel
b2dfacdc70 Fixed bench and removed unnecessary public symbols. 2020-11-05 16:12:03 +09:00
Paul Masurel
36a0520a48 Added failing proptest and fixed it. 2020-11-05 15:40:00 +09:00
82 changed files with 6552 additions and 1461 deletions

View File

@@ -7,6 +7,12 @@ Tantivy 0.14.0
- Added support for Brotli compression in the DocStore. (@ppodolsky) - Added support for Brotli compression in the DocStore. (@ppodolsky)
- Added helper for building intersections and unions in BooleanQuery (@guilload) - Added helper for building intersections and unions in BooleanQuery (@guilload)
- Bugfix in `Query::explain` - Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
This version breaks compatibility and requires users to reindex everything.
Tantivy 0.13.2 Tantivy 0.13.2
=================== ===================

View File

@@ -30,7 +30,6 @@ serde_json = "1"
num_cpus = "1" num_cpus = "1"
fs2={version="0.4", optional=true} fs2={version="0.4", optional=true}
levenshtein_automata = "0.2" levenshtein_automata = "0.2"
notify = {version="4", optional=true}
uuid = { version = "0.8", features = ["v4", "serde"] } uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8" crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] } futures = {version = "0.3", features=["thread-pool"] }
@@ -48,15 +47,18 @@ murmurhash32 = "0.2"
chrono = "0.4" chrono = "0.4"
smallvec = "1" smallvec = "1"
rayon = "1" rayon = "1"
env_logger = "0.8"
lru = "0.6"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.3" winapi = "0.3"
[dev-dependencies] [dev-dependencies]
rand = "0.7" rand = "0.8"
maplit = "1" maplit = "1"
matches = "0.1.8" matches = "0.1.8"
proptest = "0.10" proptest = "0.10"
criterion = "0.3"
[dev-dependencies.fail] [dev-dependencies.fail]
version = "0.4" version = "0.4"
@@ -73,7 +75,7 @@ overflow-checks = true
[features] [features]
default = ["mmap"] default = ["mmap"]
mmap = ["fs2", "tempfile", "memmap", "notify"] mmap = ["fs2", "tempfile", "memmap"]
brotli-compression = ["brotli"] brotli-compression = ["brotli"]
lz4-compression = ["lz4"] lz4-compression = ["lz4"]
failpoints = ["fail/failpoints"] failpoints = ["fail/failpoints"]
@@ -97,3 +99,7 @@ travis-ci = { repository = "tantivy-search/tantivy" }
name = "failpoints" name = "failpoints"
path = "tests/failpoints/mod.rs" path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"] required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false

3774
benches/alice.txt Normal file

File diff suppressed because it is too large Load Diff

22
benches/analyzer.rs Normal file
View File

@@ -0,0 +1,22 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::tokenizer::TokenizerManager;
const ALICE_TXT: &'static str = include_str!("alice.txt");
pub fn criterion_benchmark(c: &mut Criterion) {
let tokenizer_manager = TokenizerManager::default();
let tokenizer = tokenizer_manager.get("default").unwrap();
c.bench_function("default-tokenize-alice", |b| {
b.iter(|| {
let mut word_count = 0;
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
while token_stream.advance() {
word_count += 1;
}
assert_eq!(word_count, 30_731);
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
let query_ords: HashSet<u64> = facets let query_ords: HashSet<u64> = facets
.iter() .iter()
.filter_map(|key| facet_dict.term_ord(key.encoded_str())) .filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
.collect(); .collect();
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20); let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);

View File

@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
let mut collapse_facet_it = self.facets.iter().peekable(); let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0); collapse_facet_ords.push(0);
{ {
let mut facet_streamer = facet_reader.facet_dict().range().into_stream(); let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
if facet_streamer.advance() { if facet_streamer.advance() {
'outer: loop { 'outer: loop {
// at the begining of this loop, facet_streamer // at the begining of this loop, facet_streamer
@@ -368,9 +368,12 @@ impl SegmentCollector for FacetSegmentCollector {
} }
let mut facet = vec![]; let mut facet = vec![];
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord]; let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
facet_dict.ord_to_term(facet_ord as u64, &mut facet); // TODO handle errors.
// TODO if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count); if let Ok(facet) = Facet::from_encoded(facet) {
facet_counts.insert(facet, count);
}
}
} }
FacetCounts { facet_counts } FacetCounts { facet_counts }
} }

View File

@@ -0,0 +1,189 @@
// # Custom collector example
//
// This example shows how you can implement your own
// collector. As an example, we will compute a collector
// that computes the standard deviation of a given fast field.
//
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.
// ---
// Importing tantivy...
use std::marker::PhantomData;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::{FastFieldReader, FastValue};
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
///
/// ```rust
/// use tantivy::collector::{TopDocs, FilterCollector};
/// use tantivy::query::QueryParser;
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
/// use tantivy::{doc, DocAddress, Index};
///
/// let mut schema_builder = Schema::builder();
/// let title = schema_builder.add_text_field("title", TEXT);
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
/// assert!(index_writer.commit().is_ok());
///
/// let reader = index.reader().unwrap();
/// let searcher = reader.searcher();
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 1);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
///
/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
where
TPredicate: 'static,
{
field: Field,
collector: TCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
}
impl<TCollector, TPredicate, TPredicateValue: FastValue>
FilterCollector<TCollector, TPredicate, TPredicateValue>
where
TCollector: Collector + Send + Sync,
TPredicate: Fn(TPredicateValue) -> bool + Send + Sync,
{
/// Create a new FilterCollector.
pub fn new(
field: Field,
predicate: &'static TPredicate,
collector: TCollector,
) -> FilterCollector<TCollector, TPredicate, TPredicateValue> {
FilterCollector {
field,
predicate,
collector,
t_predicate_value: PhantomData,
}
}
}
impl<TCollector, TPredicate, TPredicateValue: FastValue> Collector
for FilterCollector<TCollector, TPredicate, TPredicateValue>
where
TCollector: Collector + Send + Sync,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>> {
let schema = segment_reader.schema();
let field_entry = schema.get_field_entry(self.field);
if !field_entry.is_fast() {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is not a fast field.",
field_entry.name()
)));
}
let requested_type = TPredicateValue::to_type();
let field_schema_type = field_entry.field_type().value_type();
if requested_type != field_schema_type {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is of type {:?}!={:?}",
field_entry.name(),
requested_type,
field_schema_type
)));
}
let fast_field_reader = segment_reader
.fast_fields()
.typed_fast_field_reader(self.field)
.ok_or_else(|| {
TantivyError::SchemaError(format!(
"{:?} is not declared as a fast field in the schema.",
self.field
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector,
predicate: self.predicate,
t_predicate_value: PhantomData,
})
}
fn requires_scoring(&self) -> bool {
self.collector.requires_scoring()
}
fn merge_fruits(
&self,
segment_fruits: Vec<<TCollector::Child as SegmentCollector>::Fruit>,
) -> crate::Result<TCollector::Fruit> {
self.collector.merge_fruits(segment_fruits)
}
}
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
where
TPredicate: 'static,
TPredicateValue: 'static + FastValue,
{
fast_field_reader: FastFieldReader<TPredicateValue>,
segment_collector: TSegmentCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
}
impl<TSegmentCollector, TPredicate, TPredicateValue> SegmentCollector
for FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
where
TSegmentCollector: SegmentCollector,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
type Fruit = TSegmentCollector::Fruit;
fn collect(&mut self, doc: u32, score: Score) {
let value = self.fast_field_reader.get(doc);
if (self.predicate)(value) {
self.segment_collector.collect(doc, score)
}
}
fn harvest(self) -> <TSegmentCollector as SegmentCollector>::Fruit {
self.segment_collector.harvest()
}
}

View File

@@ -114,6 +114,9 @@ use crate::query::Weight;
mod docset_collector; mod docset_collector;
pub use self::docset_collector::DocSetCollector; pub use self::docset_collector::DocSetCollector;
mod filter_collector_wrapper;
pub use self::filter_collector_wrapper::FilterCollector;
/// `Fruit` is the type for the result of our collection. /// `Fruit` is the type for the result of our collection.
/// e.g. `usize` for the `Count` collector. /// e.g. `usize` for the `Count` collector.
pub trait Fruit: Send + downcast_rs::Downcast {} pub trait Fruit: Send + downcast_rs::Downcast {}

View File

@@ -8,6 +8,13 @@ use crate::DocId;
use crate::Score; use crate::Score;
use crate::SegmentLocalId; use crate::SegmentLocalId;
use crate::collector::{FilterCollector, TopDocs};
use crate::query::QueryParser;
use crate::schema::{Schema, FAST, TEXT};
use crate::DateTime;
use crate::{doc, Index};
use std::str::FromStr;
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector { pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
compute_score: true, compute_score: true,
}; };
@@ -16,6 +23,54 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
compute_score: true, compute_score: true,
}; };
#[test]
pub fn test_filter_collector() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let price = schema_builder.add_u64_field("price", FAST);
let date = schema_builder.add_date_field("date", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
assert!(index_writer.commit().is_ok());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title]);
let query = query_parser.parse_query("diary").unwrap();
let filter_some_collector = FilterCollector::new(
price,
&|value: u64| value > 20_120u64,
TopDocs::with_limit(2),
);
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
assert_eq!(top_docs.len(), 1);
assert_eq!(top_docs[0].1, DocAddress(0, 1));
let filter_all_collector: FilterCollector<_, _, u64> =
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
assert_eq!(filtered_top_docs.len(), 0);
fn date_filter(value: DateTime) -> bool {
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
}
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
assert_eq!(filtered_date_docs.len(), 2);
}
/// Stores all of the doc ids. /// Stores all of the doc ids.
/// This collector is only used for tests. /// This collector is only used for tests.
/// It is unusable in pr /// It is unusable in pr

View File

@@ -728,7 +728,7 @@ mod tests {
} }
#[test] #[test]
fn test_top_collector_not_at_capacity() { fn test_top_collector_not_at_capacity_without_offset() {
let index = make_index(); let index = make_index();
let field = index.schema().get_field("text").unwrap(); let field = index.schema().get_field("text").unwrap();
let query_parser = QueryParser::for_index(&index, vec![field]); let query_parser = QueryParser::for_index(&index, vec![field]);

View File

@@ -20,9 +20,10 @@ impl<W: Write> CountingWriter<W> {
self.written_bytes self.written_bytes
} }
pub fn finish(mut self) -> io::Result<(W, u64)> { /// Returns the underlying write object.
self.flush()?; /// Note that this method does not trigger any flushing.
Ok((self.underlying, self.written_bytes)) pub fn finish(self) -> W {
self.underlying
} }
} }
@@ -46,7 +47,6 @@ impl<W: Write> Write for CountingWriter<W> {
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> { impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.flush()?;
self.underlying.terminate_ref(token) self.underlying.terminate_ref(token)
} }
} }
@@ -63,8 +63,9 @@ mod test {
let mut counting_writer = CountingWriter::wrap(buffer); let mut counting_writer = CountingWriter::wrap(buffer);
let bytes = (0u8..10u8).collect::<Vec<u8>>(); let bytes = (0u8..10u8).collect::<Vec<u8>>();
counting_writer.write_all(&bytes).unwrap(); counting_writer.write_all(&bytes).unwrap();
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap(); let len = counting_writer.written_bytes();
let buffer_restituted: Vec<u8> = counting_writer.finish();
assert_eq!(len, 10u64); assert_eq!(len, 10u64);
assert_eq!(w.len(), 10); assert_eq!(buffer_restituted.len(), 10);
} }
} }

View File

@@ -66,10 +66,6 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
} }
} }
pub(crate) fn is_power_of_2(n: usize) -> bool {
(n > 0) && (n & (n - 1) == 0)
}
/// Has length trait /// Has length trait
pub trait HasLen { pub trait HasLen {
/// Return length /// Return length
@@ -119,11 +115,16 @@ pub fn u64_to_i64(val: u64) -> i64 {
/// For simplicity, tantivy internally handles `f64` as `u64`. /// For simplicity, tantivy internally handles `f64` as `u64`.
/// The mapping is defined by this function. /// The mapping is defined by this function.
/// ///
/// Maps `f64` to `u64` so that lexical order is preserved. /// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
/// ///
/// This is more suited than simply casting (`val as u64`) /// This is more suited than simply casting (`val as u64`)
/// which would truncate the result /// which would truncate the result
/// ///
/// # Reference
///
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
/// explains the mapping in a clear manner.
///
/// # See also /// # See also
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html). /// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
#[inline(always)] #[inline(always)]
@@ -152,6 +153,7 @@ pub(crate) mod test {
pub use super::minmax; pub use super::minmax;
pub use super::serialize::test::fixed_size_test; pub use super::serialize::test::fixed_size_test;
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use proptest::prelude::*;
use std::f64; use std::f64;
fn test_i64_converter_helper(val: i64) { fn test_i64_converter_helper(val: i64) {
@@ -162,6 +164,15 @@ pub(crate) mod test {
assert_eq!(u64_to_f64(f64_to_u64(val)), val); assert_eq!(u64_to_f64(f64_to_u64(val)), val);
} }
proptest! {
#[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
let left_u64 = f64_to_u64(left);
let right_u64 = f64_to_u64(right);
assert_eq!(left_u64 < right_u64, left < right);
}
}
#[test] #[test]
fn test_i64_converter() { fn test_i64_converter() {
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value()); assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());

View File

@@ -5,6 +5,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta; use crate::core::SegmentMeta;
use crate::core::SegmentMetaInventory; use crate::core::SegmentMetaInventory;
use crate::core::META_FILEPATH; use crate::core::META_FILEPATH;
use crate::directory::error::OpenReadError;
use crate::directory::ManagedDirectory; use crate::directory::ManagedDirectory;
#[cfg(feature = "mmap")] #[cfg(feature = "mmap")]
use crate::directory::MmapDirectory; use crate::directory::MmapDirectory;
@@ -34,12 +35,18 @@ fn load_metas(
inventory: &SegmentMetaInventory, inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> { ) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?; let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8_lossy(&meta_data); let meta_string = String::from_utf8(meta_data)
.map_err(|utf8_err| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file is not valid utf-8. {:?}", utf8_err)
)
})?;
IndexMeta::deserialize(&meta_string, &inventory) IndexMeta::deserialize(&meta_string, &inventory)
.map_err(|e| { .map_err(|e| {
DataCorruption::new( DataCorruption::new(
META_FILEPATH.to_path_buf(), META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {:?}.", e), format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string),
) )
}) })
.map_err(From::from) .map_err(From::from)
@@ -59,7 +66,7 @@ impl Index {
/// Examines the directory to see if it contains an index. /// Examines the directory to see if it contains an index.
/// ///
/// Effectively, it only checks for the presence of the `meta.json` file. /// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists<Dir: Directory>(dir: &Dir) -> bool { pub fn exists<Dir: Directory>(dir: &Dir) -> Result<bool, OpenReadError> {
dir.exists(&META_FILEPATH) dir.exists(&META_FILEPATH)
} }
@@ -106,7 +113,7 @@ impl Index {
schema: Schema, schema: Schema,
) -> crate::Result<Index> { ) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?; let mmap_directory = MmapDirectory::open(directory_path)?;
if Index::exists(&mmap_directory) { if Index::exists(&mmap_directory)? {
return Err(TantivyError::IndexAlreadyExists); return Err(TantivyError::IndexAlreadyExists);
} }
Index::create(mmap_directory, schema) Index::create(mmap_directory, schema)
@@ -114,7 +121,7 @@ impl Index {
/// Opens or creates a new index in the provided directory /// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> { pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
if !Index::exists(&dir) { if !Index::exists(&dir)? {
return Index::create(dir, schema); return Index::create(dir, schema);
} }
let index = Index::open(dir)?; let index = Index::open(dir)?;
@@ -399,7 +406,7 @@ impl fmt::Debug for Index {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::directory::RAMDirectory; use crate::directory::{RAMDirectory, WatchCallback};
use crate::schema::Field; use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT}; use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader; use crate::IndexReader;
@@ -423,24 +430,24 @@ mod tests {
#[test] #[test]
fn test_index_exists() { fn test_index_exists() {
let directory = RAMDirectory::create(); let directory = RAMDirectory::create();
assert!(!Index::exists(&directory)); assert!(!Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory)); assert!(Index::exists(&directory).unwrap());
} }
#[test] #[test]
fn open_or_create_should_create() { fn open_or_create_should_create() {
let directory = RAMDirectory::create(); let directory = RAMDirectory::create();
assert!(!Index::exists(&directory)); assert!(!Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory)); assert!(Index::exists(&directory).unwrap());
} }
#[test] #[test]
fn open_or_create_should_open() { fn open_or_create_should_open() {
let directory = RAMDirectory::create(); let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory)); assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok()); assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
} }
@@ -448,7 +455,7 @@ mod tests {
fn create_should_wipeoff_existing() { fn create_should_wipeoff_existing() {
let directory = RAMDirectory::create(); let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory)); assert!(Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok()); assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok());
} }
@@ -456,7 +463,7 @@ mod tests {
fn open_or_create_exists_but_schema_does_not_match() { fn open_or_create_exists_but_schema_does_not_match() {
let directory = RAMDirectory::create(); let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory)); assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok()); assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
let err = Index::open_or_create(directory, Schema::builder().build()); let err = Index::open_or_create(directory, Schema::builder().build());
assert_eq!( assert_eq!(
@@ -510,28 +517,28 @@ mod tests {
} }
#[test] #[test]
fn test_index_manual_policy_mmap() { fn test_index_manual_policy_mmap() -> crate::Result<()> {
let schema = throw_away_schema(); let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap(); let field = schema.get_field("num_likes").unwrap();
let mut index = Index::create_from_tempdir(schema).unwrap(); let mut index = Index::create_from_tempdir(schema)?;
let mut writer = index.writer_for_tests().unwrap(); let mut writer = index.writer_for_tests()?;
writer.commit().unwrap(); writer.commit()?;
let reader = index let reader = index
.reader_builder() .reader_builder()
.reload_policy(ReloadPolicy::Manual) .reload_policy(ReloadPolicy::Manual)
.try_into() .try_into()?;
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0); assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64)); writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded(); let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(Box::new(move || { let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(()); let _ = sender.send(());
})); }));
writer.commit().unwrap(); writer.commit()?;
assert!(receiver.recv().is_ok()); assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0); assert_eq!(reader.searcher().num_docs(), 0);
reader.reload().unwrap(); reader.reload()?;
assert_eq!(reader.searcher().num_docs(), 1); assert_eq!(reader.searcher().num_docs(), 1);
Ok(())
} }
#[test] #[test]
@@ -554,7 +561,9 @@ mod tests {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) { fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index(); let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded(); let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || { let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
let _ = sender.send(()); let _ = sender.send(());
})); }));
let mut writer = index.writer_for_tests().unwrap(); let mut writer = index.writer_for_tests().unwrap();
@@ -595,7 +604,7 @@ mod tests {
writer.add_document(doc!(field => i)); writer.add_document(doc!(field => i));
} }
let (sender, receiver) = crossbeam::channel::unbounded(); let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(Box::new(move || { let _handle = directory.watch(WatchCallback::new(move || {
let _ = sender.send(()); let _ = sender.send(());
})); }));
writer.commit().unwrap(); writer.commit().unwrap();

View File

@@ -66,7 +66,7 @@ impl InvertedIndexReader {
} }
/// Returns the term info associated with the term. /// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> { pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
self.termdict.get(term.value_bytes()) self.termdict.get(term.value_bytes())
} }
@@ -90,9 +90,9 @@ impl InvertedIndexReader {
term_info: &TermInfo, term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings, block_postings: &mut BlockSegmentPostings,
) -> io::Result<()> { ) -> io::Result<()> {
let postings_slice = self let start_offset = term_info.postings_start_offset as usize;
.postings_file_slice let stop_offset = term_info.postings_stop_offset as usize;
.slice_from(term_info.postings_offset as usize); let postings_slice = self.postings_file_slice.slice(start_offset, stop_offset);
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?); block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
Ok(()) Ok(())
} }
@@ -106,10 +106,9 @@ impl InvertedIndexReader {
term: &Term, term: &Term,
option: IndexRecordOption, option: IndexRecordOption,
) -> io::Result<Option<BlockSegmentPostings>> { ) -> io::Result<Option<BlockSegmentPostings>> {
Ok(self self.get_term_info(term)?
.get_term_info(term)
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option)) .map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
.transpose()?) .transpose()
} }
/// Returns a block postings given a `term_info`. /// Returns a block postings given a `term_info`.
@@ -121,8 +120,10 @@ impl InvertedIndexReader {
term_info: &TermInfo, term_info: &TermInfo,
requested_option: IndexRecordOption, requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> { ) -> io::Result<BlockSegmentPostings> {
let offset = term_info.postings_offset as usize; let postings_data = self.postings_file_slice.slice(
let postings_data = self.postings_file_slice.slice_from(offset); term_info.postings_start_offset as usize,
term_info.postings_stop_offset as usize,
);
BlockSegmentPostings::open( BlockSegmentPostings::open(
term_info.doc_freq, term_info.doc_freq,
postings_data, postings_data,
@@ -179,7 +180,7 @@ impl InvertedIndexReader {
term: &Term, term: &Term,
option: IndexRecordOption, option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> { ) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term) self.get_term_info(term)?
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option)) .map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose() .transpose()
} }
@@ -189,7 +190,7 @@ impl InvertedIndexReader {
term: &Term, term: &Term,
option: IndexRecordOption, option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> { ) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term) self.get_term_info(term)?
.map(|term_info| self.read_postings_from_terminfo(&term_info, option)) .map(|term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose() .transpose()
} }
@@ -197,7 +198,7 @@ impl InvertedIndexReader {
/// Returns the number of documents containing the term. /// Returns the number of documents containing the term.
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> { pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
Ok(self Ok(self
.get_term_info(term) .get_term_info(term)?
.map(|term_info| term_info.doc_freq) .map(|term_info| term_info.doc_freq)
.unwrap_or(0u32)) .unwrap_or(0u32))
} }

View File

@@ -1,17 +1,16 @@
use crate::collector::Collector; use crate::collector::Collector;
use crate::core::Executor; use crate::core::Executor;
use crate::core::InvertedIndexReader;
use crate::core::SegmentReader; use crate::core::SegmentReader;
use crate::query::Query; use crate::query::Query;
use crate::schema::Document; use crate::schema::Document;
use crate::schema::Schema; use crate::schema::Schema;
use crate::schema::{Field, Term}; use crate::schema::Term;
use crate::space_usage::SearcherSpaceUsage; use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader; use crate::store::StoreReader;
use crate::termdict::TermMerger;
use crate::DocAddress; use crate::DocAddress;
use crate::Index; use crate::Index;
use std::sync::Arc;
use std::{fmt, io}; use std::{fmt, io};
/// Holds a list of `SegmentReader`s ready for search. /// Holds a list of `SegmentReader`s ready for search.
@@ -148,16 +147,6 @@ impl Searcher {
collector.merge_fruits(fruits) collector.merge_fruits(fruits)
} }
/// Return the field searcher associated to a `Field`.
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
.segment_readers
.iter()
.map(|segment_reader| segment_reader.inverted_index(field))
.collect::<crate::Result<Vec<_>>>()?;
Ok(FieldSearcher::new(inv_index_readers))
}
/// Summarize total space usage of this searcher. /// Summarize total space usage of this searcher.
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> { pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new(); let mut space_usage = SearcherSpaceUsage::new();
@@ -168,27 +157,6 @@ impl Searcher {
} }
} }
pub struct FieldSearcher {
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
}
impl FieldSearcher {
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
FieldSearcher { inv_index_readers }
}
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger<'_> {
let term_streamers: Vec<_> = self
.inv_index_readers
.iter()
.map(|inverted_index| inverted_index.terms().stream())
.collect();
TermMerger::new(term_streamers)
}
}
impl fmt::Debug for Searcher { impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self let segment_ids = self

View File

@@ -1,8 +1,8 @@
use crate::directory::directory_lock::Lock; use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError; use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::WatchCallback;
use crate::directory::WatchHandle; use crate::directory::WatchHandle;
use crate::directory::{FileHandle, WatchCallback};
use crate::directory::{FileSlice, WritePtr}; use crate::directory::{FileSlice, WritePtr};
use std::fmt; use std::fmt;
use std::io; use std::io;
@@ -108,10 +108,13 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
/// should be your default choice. /// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which /// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests. /// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a virtual file for read. /// Opens a file and returns a boxed `FileHandle`.
/// ///
/// Users of `Directory` should typically call `Directory::open_read(...)`,
/// while `Directory` implementor should implement `get_file_handle()`.
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
/// Once a virtual file is open, its data may not /// Once a virtual file is open, its data may not
/// change. /// change.
/// ///
@@ -119,7 +122,10 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// have no effect on the returned `FileSlice` object. /// have no effect on the returned `FileSlice` object.
/// ///
/// You should only use this to read files create with [Directory::open_write]. /// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>; fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
let file_handle = self.get_file_handle(path)?;
Ok(FileSlice::new(file_handle))
}
/// Removes a file /// Removes a file
/// ///
@@ -131,7 +137,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn delete(&self, path: &Path) -> Result<(), DeleteError>; fn delete(&self, path: &Path) -> Result<(), DeleteError>;
/// Returns true iff the file exists /// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool; fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
/// Opens a writer for the *virtual file* associated with /// Opens a writer for the *virtual file* associated with
/// a Path. /// a Path.

View File

@@ -58,7 +58,8 @@ pub enum OpenWriteError {
} }
impl OpenWriteError { impl OpenWriteError {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self { /// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath } Self::IOError { io_error, filepath }
} }
} }
@@ -143,7 +144,8 @@ pub enum OpenReadError {
} }
impl OpenReadError { impl OpenReadError {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self { /// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath } Self::IOError { io_error, filepath }
} }
} }

View File

@@ -2,10 +2,11 @@ use stable_deref_trait::StableDeref;
use crate::common::HasLen; use crate::common::HasLen;
use crate::directory::OwnedBytes; use crate::directory::OwnedBytes;
use std::sync::Arc; use std::sync::{Arc, Weak};
use std::{io, ops::Deref}; use std::{io, ops::Deref};
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>; pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
/// Objects that represents files sections in tantivy. /// Objects that represents files sections in tantivy.
/// ///
@@ -40,7 +41,7 @@ where
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync, B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
{ {
fn from(bytes: B) -> FileSlice { fn from(bytes: B) -> FileSlice {
FileSlice::new(OwnedBytes::new(bytes)) FileSlice::new(Box::new(OwnedBytes::new(bytes)))
} }
} }
@@ -50,22 +51,25 @@ where
/// ///
#[derive(Clone)] #[derive(Clone)]
pub struct FileSlice { pub struct FileSlice {
data: Arc<Box<dyn FileHandle>>, data: Arc<dyn FileHandle>,
start: usize, start: usize,
stop: usize, stop: usize,
} }
impl FileSlice { impl FileSlice {
/// Wraps a FileHandle. /// Wraps a FileHandle.
pub fn new<D>(data: D) -> Self pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
where let num_bytes = file_handle.len();
D: FileHandle, FileSlice::new_with_num_bytes(file_handle, num_bytes)
{ }
let len = data.len();
/// Wraps a FileHandle.
#[doc(hidden)]
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
FileSlice { FileSlice {
data: Arc::new(Box::new(data)), data: Arc::from(file_handle),
start: 0, start: 0,
stop: len, stop: num_bytes,
} }
} }
@@ -146,6 +150,12 @@ impl FileSlice {
} }
} }
impl FileHandle for FileSlice {
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
self.read_bytes_slice(from, to)
}
}
impl HasLen for FileSlice { impl HasLen for FileSlice {
fn len(&self) -> usize { fn len(&self) -> usize {
self.stop - self.start self.stop - self.start
@@ -160,7 +170,7 @@ mod tests {
#[test] #[test]
fn test_file_slice() -> io::Result<()> { fn test_file_slice() -> io::Result<()> {
let file_slice = FileSlice::new(b"abcdef".as_ref()); let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
assert_eq!(file_slice.len(), 6); assert_eq!(file_slice.len(), 6);
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef"); assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab"); assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
@@ -204,7 +214,7 @@ mod tests {
#[test] #[test]
fn test_slice_simple_read() -> io::Result<()> { fn test_slice_simple_read() -> io::Result<()> {
let slice = FileSlice::new(&b"abcdef"[..]); let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice.len(), 6); assert_eq!(slice.len(), 6);
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef"); assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd"); assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
@@ -213,7 +223,7 @@ mod tests {
#[test] #[test]
fn test_slice_read_slice() -> io::Result<()> { fn test_slice_read_slice() -> io::Result<()> {
let slice_deref = FileSlice::new(&b"abcdef"[..]); let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd"); assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
Ok(()) Ok(())
} }
@@ -221,14 +231,14 @@ mod tests {
#[test] #[test]
#[should_panic(expected = "assertion failed: from <= to")] #[should_panic(expected = "assertion failed: from <= to")]
fn test_slice_read_slice_invalid_range() { fn test_slice_read_slice_invalid_range() {
let slice_deref = FileSlice::new(&b"abcdef"[..]); let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd"); assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
} }
#[test] #[test]
#[should_panic(expected = "`to` exceeds the fileslice length")] #[should_panic(expected = "`to` exceeds the fileslice length")]
fn test_slice_read_slice_invalid_range_exceeds() { fn test_slice_read_slice_invalid_range_exceeds() {
let slice_deref = FileSlice::new(&b"abcdef"[..]); let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!( assert_eq!(
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(), slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
b"bcd" b"bcd"

View File

@@ -0,0 +1,178 @@
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<Path>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &Path) -> FileWatcher {
FileWatcher {
path: Arc::from(path),
callbacks: Default::default(),
state: Default::default(),
}
}
pub fn spawn(&self) {
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
return;
}
let path = self.path.clone();
let callbacks = self.callbacks.clone();
let state = self.state.clone();
thread::Builder::new()
.name("thread-tantivy-meta-file-watcher".to_string())
.spawn(move || {
let mut current_checksum = None;
while state.load(Ordering::SeqCst) == 1 {
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
info!("Meta file {:?} was modified", path);
current_checksum = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
}
}
thread::sleep(POLLING_INTERVAL);
}
})
.expect("Failed to spawn meta file watcher thread");
}
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
let handle = self.callbacks.subscribe(callback);
self.spawn();
handle
}
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};
let mut hasher = Hasher::new();
for line in reader.lines() {
hasher.update(line?.as_bytes())
}
Ok(hasher.finalize())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::mem;
use crate::directory::mmap_directory::atomic_write;
use super::*;
#[test]
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let _handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
atomic_write(&tmp_file, b"foo")?;
assert!(rx.recv_timeout(timeout).is_err());
atomic_write(&tmp_file, b"bar")?;
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(watcher);
atomic_write(&tmp_file, b"qux")?;
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
Ok(())
}
#[test]
fn test_file_watcher_drop_handle() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
mem::drop(handle);
atomic_write(&tmp_file, b"qux")?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(state.load(Ordering::SeqCst), 1);
Ok(())
}
}

View File

@@ -115,6 +115,18 @@ impl Footer {
} }
Ok(()) Ok(())
} }
VersionedFooter::V3 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch { VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
library_version: library_version.clone(), library_version: library_version.clone(),
index_version: self.version.clone(), index_version: self.version.clone(),
@@ -136,24 +148,31 @@ pub enum VersionedFooter {
crc32: CrcHashU32, crc32: CrcHashU32,
store_compression: String, store_compression: String,
}, },
// Block wand max termfred on 1 byte
V3 {
crc32: CrcHashU32,
store_compression: String,
},
} }
impl BinarySerializable for VersionedFooter { impl BinarySerializable for VersionedFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> { fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new(); let mut buf = Vec::new();
match self { match self {
VersionedFooter::V2 { VersionedFooter::V3 {
crc32, crc32,
store_compression: compression, store_compression: compression,
} => { } => {
// Serializes a valid `VersionedFooter` or panics if the version is unknown // Serializes a valid `VersionedFooter` or panics if the version is unknown
// [ version | crc_hash | compression_mode ] // [ version | crc_hash | compression_mode ]
// [ 0..4 | 4..8 | variable ] // [ 0..4 | 4..8 | variable ]
BinarySerializable::serialize(&2u32, &mut buf)?; BinarySerializable::serialize(&3u32, &mut buf)?;
BinarySerializable::serialize(crc32, &mut buf)?; BinarySerializable::serialize(crc32, &mut buf)?;
BinarySerializable::serialize(compression, &mut buf)?; BinarySerializable::serialize(compression, &mut buf)?;
} }
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => { VersionedFooter::V2 { .. }
| VersionedFooter::V1 { .. }
| VersionedFooter::UnknownVersion => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
"Cannot serialize an unknown versioned footer ", "Cannot serialize an unknown versioned footer ",
@@ -182,7 +201,7 @@ impl BinarySerializable for VersionedFooter {
reader.read_exact(&mut buf[..])?; reader.read_exact(&mut buf[..])?;
let mut cursor = &buf[..]; let mut cursor = &buf[..];
let version = u32::deserialize(&mut cursor)?; let version = u32::deserialize(&mut cursor)?;
if version != 1 && version != 2 { if version > 3 {
return Ok(VersionedFooter::UnknownVersion); return Ok(VersionedFooter::UnknownVersion);
} }
let crc32 = u32::deserialize(&mut cursor)?; let crc32 = u32::deserialize(&mut cursor)?;
@@ -192,12 +211,17 @@ impl BinarySerializable for VersionedFooter {
crc32, crc32,
store_compression, store_compression,
} }
} else { } else if version == 2 {
assert_eq!(version, 2);
VersionedFooter::V2 { VersionedFooter::V2 {
crc32, crc32,
store_compression, store_compression,
} }
} else {
assert_eq!(version, 3);
VersionedFooter::V3 {
crc32,
store_compression,
}
}) })
} }
} }
@@ -205,6 +229,7 @@ impl BinarySerializable for VersionedFooter {
impl VersionedFooter { impl VersionedFooter {
pub fn crc(&self) -> Option<CrcHashU32> { pub fn crc(&self) -> Option<CrcHashU32> {
match self { match self {
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
VersionedFooter::V2 { crc32, .. } => Some(*crc32), VersionedFooter::V2 { crc32, .. } => Some(*crc32),
VersionedFooter::V1 { crc32, .. } => Some(*crc32), VersionedFooter::V1 { crc32, .. } => Some(*crc32),
VersionedFooter::UnknownVersion { .. } => None, VersionedFooter::UnknownVersion { .. } => None,
@@ -243,7 +268,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> { impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc32 = self.hasher.take().unwrap().finalize(); let crc32 = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V2 { let footer = Footer::new(VersionedFooter::V3 {
crc32, crc32,
store_compression: crate::store::COMPRESSION.to_string(), store_compression: crate::store::COMPRESSION.to_string(),
}); });
@@ -278,7 +303,7 @@ mod tests {
let footer = Footer::deserialize(&mut &vec[..]).unwrap(); let footer = Footer::deserialize(&mut &vec[..]).unwrap();
assert!(matches!( assert!(matches!(
footer.versioned_footer, footer.versioned_footer,
VersionedFooter::V2 { store_compression, .. } VersionedFooter::V3 { store_compression, .. }
if store_compression == crate::store::COMPRESSION if store_compression == crate::store::COMPRESSION
)); ));
assert_eq!(&footer.version, crate::version()); assert_eq!(&footer.version, crate::version());
@@ -288,7 +313,7 @@ mod tests {
fn test_serialize_deserialize_footer() { fn test_serialize_deserialize_footer() {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let crc32 = 123456u32; let crc32 = 123456u32;
let footer: Footer = Footer::new(VersionedFooter::V2 { let footer: Footer = Footer::new(VersionedFooter::V3 {
crc32, crc32,
store_compression: "lz4".to_string(), store_compression: "lz4".to_string(),
}); });
@@ -300,7 +325,7 @@ mod tests {
#[test] #[test]
fn footer_length() { fn footer_length() {
let crc32 = 1111111u32; let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V2 { let versioned_footer = VersionedFooter::V3 {
crc32, crc32,
store_compression: "lz4".to_string(), store_compression: "lz4".to_string(),
}; };
@@ -321,7 +346,7 @@ mod tests {
// versionned footer length // versionned footer length
12 | 128, 12 | 128,
// index format version // index format version
2, 3,
0, 0,
0, 0,
0, 0,
@@ -340,7 +365,7 @@ mod tests {
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap(); let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
assert!(cursor.is_empty()); assert!(cursor.is_empty());
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32; let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 { let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
crc32: expected_crc, crc32: expected_crc,
store_compression: "lz4".to_string(), store_compression: "lz4".to_string(),
}; };

View File

@@ -1,10 +1,10 @@
use crate::core::{MANAGED_FILEPATH, META_FILEPATH}; use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError}; use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
use crate::directory::footer::{Footer, FooterProxy}; use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::GarbageCollectionResult; use crate::directory::GarbageCollectionResult;
use crate::directory::Lock; use crate::directory::Lock;
use crate::directory::META_LOCK; use crate::directory::META_LOCK;
use crate::directory::{DirectoryLock, FileHandle};
use crate::directory::{FileSlice, WritePtr}; use crate::directory::{FileSlice, WritePtr};
use crate::directory::{WatchCallback, WatchHandle}; use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption; use crate::error::DataCorruption;
@@ -274,6 +274,11 @@ impl ManagedDirectory {
} }
impl Directory for ManagedDirectory { impl Directory for ManagedDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> { fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
let file_slice = self.directory.open_read(path)?; let file_slice = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(file_slice) let (footer, reader) = Footer::extract_footer(file_slice)
@@ -307,7 +312,7 @@ impl Directory for ManagedDirectory {
self.directory.delete(path) self.directory.delete(path)
} }
fn exists(&self, path: &Path) -> bool { fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
self.directory.exists(path) self.directory.exists(path)
} }
@@ -355,22 +360,22 @@ mod tests_mmap_specific {
managed_directory managed_directory
.atomic_write(test_path2, &[0u8, 1u8]) .atomic_write(test_path2, &[0u8, 1u8])
.unwrap(); .unwrap();
assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path2)); assert!(managed_directory.exists(test_path2).unwrap());
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect(); let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2)); assert!(!managed_directory.exists(test_path2).unwrap());
} }
{ {
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2)); assert!(!managed_directory.exists(test_path2).unwrap());
let living_files: HashSet<PathBuf> = HashSet::new(); let living_files: HashSet<PathBuf> = HashSet::new();
assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2)); assert!(!managed_directory.exists(test_path2).unwrap());
} }
} }
@@ -387,7 +392,7 @@ mod tests_mmap_specific {
let mut write = managed_directory.open_write(test_path1).unwrap(); let mut write = managed_directory.open_write(test_path1).unwrap();
write.write_all(&[0u8, 1u8]).unwrap(); write.write_all(&[0u8, 1u8]).unwrap();
write.terminate().unwrap(); write.terminate().unwrap();
assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path1).unwrap());
let _mmap_read = managed_directory.open_read(test_path1).unwrap(); let _mmap_read = managed_directory.open_read(test_path1).unwrap();
assert!(managed_directory assert!(managed_directory
@@ -395,15 +400,15 @@ mod tests_mmap_specific {
.is_ok()); .is_ok());
if cfg!(target_os = "windows") { if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped. // On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path1).unwrap());
// unmap should happen here. // unmap should happen here.
drop(_mmap_read); drop(_mmap_read);
// The file should still be in the list of managed file and // The file should still be in the list of managed file and
// eventually be deleted once mmap is released. // eventually be deleted once mmap is released.
assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path1).unwrap());
} else { } else {
assert!(!managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path1).unwrap());
} }
} }

View File

@@ -1,21 +1,17 @@
use crate::core::META_FILEPATH; use crate::core::META_FILEPATH;
use crate::directory::error::LockError; use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError}; use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::AntiCallToken; use crate::directory::file_watcher::FileWatcher;
use crate::directory::BoxedData;
use crate::directory::Directory; use crate::directory::Directory;
use crate::directory::DirectoryLock; use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock; use crate::directory::Lock;
use crate::directory::WatchCallback; use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle; use crate::directory::WatchHandle;
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
use crate::directory::{ArcBytes, WeakArcBytes};
use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt; use fs2::FileExt;
use memmap::Mmap; use memmap::Mmap;
use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use stable_deref_trait::StableDeref; use stable_deref_trait::StableDeref;
use std::convert::From; use std::convert::From;
@@ -26,12 +22,8 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write}; use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::result; use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::Weak;
use std::thread;
use std::{collections::HashMap, ops::Deref}; use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir; use tempfile::TempDir;
@@ -84,7 +76,7 @@ pub struct CacheInfo {
struct MmapCache { struct MmapCache {
counters: CacheCounters, counters: CacheCounters,
cache: HashMap<PathBuf, Weak<BoxedData>>, cache: HashMap<PathBuf, WeakArcBytes>,
} }
impl Default for MmapCache { impl Default for MmapCache {
@@ -118,7 +110,7 @@ impl MmapCache {
} }
// Returns None if the file exists but as a len of 0 (and hence is not mmappable). // Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> { fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
if let Some(mmap_weak) = self.cache.get(full_path) { if let Some(mmap_weak) = self.cache.get(full_path) {
if let Some(mmap_arc) = mmap_weak.upgrade() { if let Some(mmap_arc) = mmap_weak.upgrade() {
self.counters.hit += 1; self.counters.hit += 1;
@@ -129,7 +121,7 @@ impl MmapCache {
self.counters.miss += 1; self.counters.miss += 1;
let mmap_opt = open_mmap(full_path)?; let mmap_opt = open_mmap(full_path)?;
Ok(mmap_opt.map(|mmap| { Ok(mmap_opt.map(|mmap| {
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap)); let mmap_arc: ArcBytes = Arc::new(mmap);
let mmap_weak = Arc::downgrade(&mmap_arc); let mmap_weak = Arc::downgrade(&mmap_arc);
self.cache.insert(full_path.to_owned(), mmap_weak); self.cache.insert(full_path.to_owned(), mmap_weak);
mmap_arc mmap_arc
@@ -137,67 +129,6 @@ impl MmapCache {
} }
} }
struct WatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
loop {
match watcher_recv.recv().map(|evt| evt.path) {
Ok(Some(changed_path)) => {
// ... Actually subject to false positive.
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
let _ = watcher_router_clone.broadcast();
}
}
}
Ok(None) => {
// not an event we are interested in.
}
Err(_e) => {
// the watch send channel was dropped
break;
}
}
}
})
.map_err(|io_error| OpenDirectoryError::IoError {
io_error,
directory_path: path.to_path_buf(),
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.watcher_router.subscribe(watch_callback)
}
}
/// Directory storing data in files, read via mmap. /// Directory storing data in files, read via mmap.
/// ///
/// The Mmap object are cached to limit the /// The Mmap object are cached to limit the
@@ -219,40 +150,21 @@ struct MmapDirectoryInner {
root_path: PathBuf, root_path: PathBuf,
mmap_cache: RwLock<MmapCache>, mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>, _temp_directory: Option<TempDir>,
watcher: RwLock<Option<WatcherWrapper>>, watcher: FileWatcher,
} }
impl MmapDirectoryInner { impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner { fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
MmapDirectoryInner { MmapDirectoryInner {
root_path,
mmap_cache: Default::default(), mmap_cache: Default::default(),
_temp_directory: temp_directory, _temp_directory: temp_directory,
watcher: RwLock::new(None), watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path,
} }
} }
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> { fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics Ok(self.watcher.watch(callback))
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
Ok(watch_wrapper.watch(watch_callback))
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
} }
} }
@@ -402,7 +314,7 @@ impl TerminatingWrite for SafeFileWriter {
} }
#[derive(Clone)] #[derive(Clone)]
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>); struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
impl Deref for MmapArc { impl Deref for MmapArc {
type Target = [u8]; type Target = [u8];
@@ -413,8 +325,26 @@ impl Deref for MmapArc {
} }
unsafe impl StableDeref for MmapArc {} unsafe impl StableDeref for MmapArc {}
/// Writes a file in an atomic manner.
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
// We create the temporary file in the same directory as the target file.
// Indeed the canonical temp directory and the target file might sit in different
// filesystem, in which case the atomic write may actually not work.
let parent_path = path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Path {:?} does not have parent directory.",
)
})?;
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
tempfile.into_temp_path().persist(path)?;
Ok(())
}
impl Directory for MmapDirectory { impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> { fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
debug!("Open Read {:?}", path); debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path); let full_path = self.resolve_path(path);
@@ -427,11 +357,16 @@ impl Directory for MmapDirectory {
let io_err = make_io_err(msg); let io_err = make_io_err(msg);
OpenReadError::wrap_io_error(io_err, path.to_path_buf()) OpenReadError::wrap_io_error(io_err, path.to_path_buf())
})?; })?;
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
Ok(FileSlice::from(MmapArc(mmap_arc))) let owned_bytes = mmap_cache
} else { .get_mmap(&full_path)?
Ok(FileSlice::empty()) .map(|mmap_arc| {
} let mmap_arc_obj = MmapArc(mmap_arc);
OwnedBytes::new(mmap_arc_obj)
})
.unwrap_or_else(OwnedBytes::empty);
Ok(Box::new(owned_bytes))
} }
/// Any entry associated to the path in the mmap will be /// Any entry associated to the path in the mmap will be
@@ -456,9 +391,9 @@ impl Directory for MmapDirectory {
} }
} }
fn exists(&self, path: &Path) -> bool { fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
let full_path = self.resolve_path(path); let full_path = self.resolve_path(path);
full_path.exists() Ok(full_path.exists())
} }
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> { fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -513,12 +448,9 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> { fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path); debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
let full_path = self.resolve_path(path); let full_path = self.resolve_path(path);
tempfile.into_temp_path().persist(full_path)?; atomic_write(&full_path, content)?;
Ok(()) self.sync_directory()
} }
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> { fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
@@ -557,8 +489,6 @@ mod tests {
use crate::Index; use crate::Index;
use crate::ReloadPolicy; use crate::ReloadPolicy;
use crate::{common::HasLen, indexer::LogMergePolicy}; use crate::{common::HasLen, indexer::LogMergePolicy};
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test] #[test]
fn test_open_non_existent_path() { fn test_open_non_existent_path() {
@@ -647,27 +577,6 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
} }
#[test]
fn test_watch_wrapper() {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test] #[test]
fn test_mmap_released() { fn test_mmap_released() {
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();

View File

@@ -10,6 +10,7 @@ mod mmap_directory;
mod directory; mod directory;
mod directory_lock; mod directory_lock;
mod file_slice; mod file_slice;
mod file_watcher;
mod footer; mod footer;
mod managed_directory; mod managed_directory;
mod owned_bytes; mod owned_bytes;
@@ -22,7 +23,7 @@ pub mod error;
pub use self::directory::DirectoryLock; pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone}; pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub(crate) use self::file_slice::BoxedData; pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
pub use self::file_slice::{FileHandle, FileSlice}; pub use self::file_slice::{FileHandle, FileSlice};
pub use self::owned_bytes::OwnedBytes; pub use self::owned_bytes::OwnedBytes;
pub use self::ram_directory::RAMDirectory; pub use self::ram_directory::RAMDirectory;

View File

@@ -1,5 +1,6 @@
use crate::directory::FileHandle; use crate::directory::FileHandle;
use stable_deref_trait::StableDeref; use stable_deref_trait::StableDeref;
use std::convert::TryInto;
use std::mem; use std::mem;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
@@ -95,6 +96,24 @@ impl OwnedBytes {
pub fn advance(&mut self, advance_len: usize) { pub fn advance(&mut self, advance_len: usize) {
self.data = &self.data[advance_len..] self.data = &self.data[advance_len..]
} }
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
pub fn read_u8(&mut self) -> u8 {
assert!(!self.is_empty());
let byte = self.as_slice()[0];
self.advance(1);
byte
}
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
pub fn read_u64(&mut self) -> u64 {
assert!(self.len() > 7);
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
self.advance(8);
u64::from_le_bytes(octlet)
}
} }
impl fmt::Debug for OwnedBytes { impl fmt::Debug for OwnedBytes {
@@ -230,6 +249,22 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_owned_bytes_read_u8() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
assert_eq!(bytes.read_u8(), 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test]
fn test_owned_bytes_read_u64() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
assert_eq!(bytes.read_u64(), u64::MAX - 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test] #[test]
fn test_owned_bytes_split() { fn test_owned_bytes_split() {
let bytes = OwnedBytes::new(b"abcdefghi".as_ref()); let bytes = OwnedBytes::new(b"abcdefghi".as_ref());

View File

@@ -12,6 +12,8 @@ use std::path::{Path, PathBuf};
use std::result; use std::result;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use super::FileHandle;
/// Writer associated with the `RAMDirectory` /// Writer associated with the `RAMDirectory`
/// ///
/// The Writer just writes a buffer. /// The Writer just writes a buffer.
@@ -163,6 +165,11 @@ impl RAMDirectory {
} }
impl Directory for RAMDirectory { impl Directory for RAMDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> { fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
self.fs.read().unwrap().open_read(path) self.fs.read().unwrap().open_read(path)
} }
@@ -177,8 +184,15 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path) self.fs.write().unwrap().delete(path)
} }
fn exists(&self, path: &Path) -> bool { fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
self.fs.read().unwrap().exists(path) Ok(self
.fs
.read()
.map_err(|e| OpenReadError::IOError {
io_error: io::Error::new(io::ErrorKind::Other, e.to_string()),
filepath: path.to_path_buf(),
})?
.exists(path))
} }
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> { fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {

View File

@@ -130,7 +130,7 @@ fn ram_directory_panics_if_flush_forgotten() {
fn test_simple(directory: &dyn Directory) -> crate::Result<()> { fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test"); let test_path: &'static Path = Path::new("some_path_for_test");
let mut write_file = directory.open_write(test_path)?; let mut write_file = directory.open_write(test_path)?;
assert!(directory.exists(test_path)); assert!(directory.exists(test_path).unwrap());
write_file.write_all(&[4])?; write_file.write_all(&[4])?;
write_file.write_all(&[3])?; write_file.write_all(&[3])?;
write_file.write_all(&[7, 3, 5])?; write_file.write_all(&[7, 3, 5])?;
@@ -139,14 +139,14 @@ fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]); assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]);
mem::drop(read_file); mem::drop(read_file);
assert!(directory.delete(test_path).is_ok()); assert!(directory.delete(test_path).is_ok());
assert!(!directory.exists(test_path)); assert!(!directory.exists(test_path).unwrap());
Ok(()) Ok(())
} }
fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> { fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test"); let test_path: &'static Path = Path::new("some_path_for_test");
directory.open_write(test_path)?; directory.open_write(test_path)?;
assert!(directory.exists(test_path)); assert!(directory.exists(test_path).unwrap());
assert!(directory.open_write(test_path).is_err()); assert!(directory.open_write(test_path).is_err());
assert!(directory.delete(test_path).is_ok()); assert!(directory.delete(test_path).is_ok());
Ok(()) Ok(())
@@ -157,7 +157,7 @@ fn test_write_create_the_file(directory: &dyn Directory) {
{ {
assert!(directory.open_read(test_path).is_err()); assert!(directory.open_read(test_path).is_err());
let _w = directory.open_write(test_path).unwrap(); let _w = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path)); assert!(directory.exists(test_path).unwrap());
assert!(directory.open_read(test_path).is_ok()); assert!(directory.open_read(test_path).is_ok());
assert!(directory.delete(test_path).is_ok()); assert!(directory.delete(test_path).is_ok());
} }
@@ -190,38 +190,33 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
} }
fn test_watch(directory: &dyn Directory) { fn test_watch(directory: &dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone(); let (tx, rx) = crossbeam::channel::unbounded();
let (sender, receiver) = crossbeam::channel::unbounded(); let timeout = Duration::from_millis(500);
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, SeqCst); let handle = directory
}); .watch(WatchCallback::new(move || {
// This callback is used to synchronize watching in our unit test. let val = counter.fetch_add(1, SeqCst);
// We bind it to a variable because the callback is removed when that tx.send(val + 1).unwrap();
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
})) }))
.unwrap(); .unwrap();
for i in 0..10 {
assert!(i <= counter.load(SeqCst));
assert!(directory assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2") .atomic_write(Path::new("meta.json"), b"foo")
.is_ok()); .is_ok());
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i)); assert_eq!(rx.recv_timeout(timeout), Ok(1));
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
}
mem::drop(watch_handle);
assert!(directory assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data") .atomic_write(Path::new("meta.json"), b"bar")
.is_ok()); .is_ok());
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok()); assert_eq!(rx.recv_timeout(timeout), Ok(2));
assert!(10 <= counter.load(SeqCst));
mem::drop(handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"qux")
.is_ok());
assert!(rx.recv_timeout(timeout).is_err());
} }
fn test_lock_non_blocking(directory: &dyn Directory) { fn test_lock_non_blocking(directory: &dyn Directory) {

View File

@@ -4,8 +4,20 @@ use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::Weak; use std::sync::Weak;
/// Type alias for callbacks registered when watching files of a `Directory`. /// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
pub type WatchCallback = Box<dyn Fn() + Sync + Send>; #[derive(Clone)]
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(op))
}
fn call(&self) {
self.0()
}
}
/// Helper struct to implement the watch method in `Directory` implementations. /// Helper struct to implement the watch method in `Directory` implementations.
/// ///
@@ -34,7 +46,7 @@ impl WatchHandle {
/// ///
/// This function is only useful when implementing a readonly directory. /// This function is only useful when implementing a readonly directory.
pub fn empty() -> WatchHandle { pub fn empty() -> WatchHandle {
WatchHandle::new(Arc::new(Box::new(|| {}))) WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
} }
} }
@@ -47,13 +59,13 @@ impl WatchCallbackList {
WatchHandle::new(watch_callback_arc) WatchHandle::new(watch_callback_arc)
} }
fn list_callback(&self) -> Vec<Arc<WatchCallback>> { fn list_callback(&self) -> Vec<WatchCallback> {
let mut callbacks = vec![]; let mut callbacks: Vec<WatchCallback> = vec![];
let mut router_wlock = self.router.write().unwrap(); let mut router_wlock = self.router.write().unwrap();
let mut i = 0; let mut i = 0;
while i < router_wlock.len() { while i < router_wlock.len() {
if let Some(watch) = router_wlock[i].upgrade() { if let Some(watch) = router_wlock[i].upgrade() {
callbacks.push(watch); callbacks.push(watch.as_ref().clone());
i += 1; i += 1;
} else { } else {
router_wlock.swap_remove(i); router_wlock.swap_remove(i);
@@ -75,7 +87,7 @@ impl WatchCallbackList {
.name("watch-callbacks".to_string()) .name("watch-callbacks".to_string())
.spawn(move || { .spawn(move || {
for callback in callbacks { for callback in callbacks {
callback(); callback.call();
} }
let _ = sender.send(()); let _ = sender.send(());
}); });
@@ -91,7 +103,7 @@ impl WatchCallbackList {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::directory::WatchCallbackList; use crate::directory::{WatchCallback, WatchCallbackList};
use futures::executor::block_on; use futures::executor::block_on;
use std::mem; use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@@ -102,7 +114,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default(); let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone(); let counter_clone = counter.clone();
let inc_callback = Box::new(move || { let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst); counter_clone.fetch_add(1, Ordering::SeqCst);
}); });
block_on(watch_event_router.broadcast()); block_on(watch_event_router.broadcast());
@@ -130,7 +142,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| { let inc_callback = |inc: usize| {
let counter_clone = counter.clone(); let counter_clone = counter.clone();
Box::new(move || { WatchCallback::new(move || {
counter_clone.fetch_add(inc, Ordering::SeqCst); counter_clone.fetch_add(inc, Ordering::SeqCst);
}) })
}; };
@@ -158,7 +170,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default(); let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone(); let counter_clone = counter.clone();
let inc_callback = Box::new(move || { let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst); counter_clone.fetch_add(1, Ordering::SeqCst);
}); });
let handle_a = watch_event_router.subscribe(inc_callback); let handle_a = watch_event_router.subscribe(inc_callback);

View File

@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
pub const TERMINATED: DocId = std::i32::MAX as u32; pub const TERMINATED: DocId = std::i32::MAX as u32;
/// Represents an iterable set of sorted doc ids. /// Represents an iterable set of sorted doc ids.
pub trait DocSet { pub trait DocSet: Send {
/// Goes to the next element. /// Goes to the next element.
/// ///
/// The DocId of the next element is returned. /// The DocId of the next element is returned.
@@ -129,6 +129,14 @@ impl<'a> DocSet for &'a mut dyn DocSet {
fn size_hint(&self) -> u32 { fn size_hint(&self) -> u32 {
(**self).size_hint() (**self).size_hint()
} }
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
(**self).count(delete_bitset)
}
fn count_including_deleted(&mut self) -> u32 {
(**self).count_including_deleted()
}
} }
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> { impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {

View File

@@ -86,7 +86,7 @@ mod tests {
let term = Term::from_field_bytes(field, b"lucene".as_ref()); let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic); let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&searcher, true)?; let term_weight = term_query.specialized_weight(&searcher, true)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32)?; let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
assert_eq!(term_scorer.doc(), 0u32); assert_eq!(term_scorer.doc(), 0u32);
Ok(()) Ok(())
} }
@@ -98,10 +98,9 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap(); let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref()); let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic); let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&searcher, false)?; let term_weight_err = term_query.specialized_weight(&searcher, false);
let term_scorer_err = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32);
assert!(matches!( assert!(matches!(
term_scorer_err, term_weight_err,
Err(crate::TantivyError::SchemaError(_)) Err(crate::TantivyError::SchemaError(_))
)); ));
Ok(()) Ok(())

View File

@@ -1,4 +1,5 @@
use super::MultiValueIntFastFieldReader; use super::MultiValueIntFastFieldReader;
use crate::error::DataCorruption;
use crate::schema::Facet; use crate::schema::Facet;
use crate::termdict::TermDictionary; use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal; use crate::termdict::TermOrdinal;
@@ -62,12 +63,13 @@ impl FacetReader {
&mut self, &mut self,
facet_ord: TermOrdinal, facet_ord: TermOrdinal,
output: &mut Facet, output: &mut Facet,
) -> Result<(), str::Utf8Error> { ) -> crate::Result<()> {
let found_term = self let found_term = self
.term_dict .term_dict
.ord_to_term(facet_ord as u64, &mut self.buffer); .ord_to_term(facet_ord as u64, &mut self.buffer)?;
assert!(found_term, "Term ordinal {} no found.", facet_ord); assert!(found_term, "Term ordinal {} no found.", facet_ord);
let facet_str = str::from_utf8(&self.buffer[..])?; let facet_str = str::from_utf8(&self.buffer[..])
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
output.set_facet_str(facet_str); output.set_facet_str(facet_str);
Ok(()) Ok(())
} }

View File

@@ -629,7 +629,7 @@ mod bench {
{ {
let fast_fields_composite = CompositeFile::open(&file).unwrap(); let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap(); let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data); let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
b.iter(|| { b.iter(|| {
let n = test::black_box(7000u32); let n = test::black_box(7000u32);
@@ -663,7 +663,7 @@ mod bench {
{ {
let fast_fields_composite = CompositeFile::open(&file).unwrap(); let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap(); let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data); let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
b.iter(|| { b.iter(|| {
let n = test::black_box(1000u32); let n = test::black_box(1000u32);

View File

@@ -51,6 +51,15 @@ impl<Item: FastValue> FastFieldReader<Item> {
} }
} }
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
FastFieldReader {
bit_unpacker: self.bit_unpacker,
min_value_u64: self.min_value_u64,
max_value_u64: self.max_value_u64,
_phantom: PhantomData,
}
}
/// Return the value associated to the given document. /// Return the value associated to the given document.
/// ///
/// This accessor should return as fast as possible. /// This accessor should return as fast as possible.

View File

@@ -1,6 +1,6 @@
use crate::common::CompositeFile; use crate::common::CompositeFile;
use crate::fastfield::BytesFastFieldReader;
use crate::fastfield::MultiValueIntFastFieldReader; use crate::fastfield::MultiValueIntFastFieldReader;
use crate::fastfield::{BytesFastFieldReader, FastValue};
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader}; use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
use crate::schema::{Cardinality, Field, FieldType, Schema}; use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage; use crate::space_usage::PerFieldSpaceUsage;
@@ -201,6 +201,14 @@ impl FastFieldReaders {
None None
} }
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
&self,
field: Field,
) -> Option<FastFieldReader<TFastValue>> {
self.u64_lenient(field)
.map(|fast_field_reader| fast_field_reader.cast())
}
/// Returns the `i64` fast field reader reader associated to `field`. /// Returns the `i64` fast field reader reader associated to `field`.
/// ///
/// If `field` is not a i64 fast field, this method returns `None`. /// If `field` is not a i64 fast field, this method returns `None`.

View File

@@ -61,20 +61,56 @@ impl FieldNormReaders {
/// precompute computationally expensive functions of the fieldnorm /// precompute computationally expensive functions of the fieldnorm
/// in a very short array. /// in a very short array.
#[derive(Clone)] #[derive(Clone)]
pub struct FieldNormReader { pub struct FieldNormReader(ReaderImplEnum);
data: OwnedBytes,
impl From<ReaderImplEnum> for FieldNormReader {
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
FieldNormReader(reader_enum)
}
}
#[derive(Clone)]
enum ReaderImplEnum {
FromData(OwnedBytes),
Const {
num_docs: u32,
fieldnorm_id: u8,
fieldnorm: u32,
},
} }
impl FieldNormReader { impl FieldNormReader {
/// Creates a `FieldNormReader` with a constant fieldnorm.
///
/// The fieldnorm will be subjected to compression as if it was coming
/// from an array-backed fieldnorm reader.
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
ReaderImplEnum::Const {
num_docs,
fieldnorm_id,
fieldnorm,
}
.into()
}
/// Opens a field norm reader given its file. /// Opens a field norm reader given its file.
pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> { pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> {
let data = fieldnorm_file.read_bytes()?; let data = fieldnorm_file.read_bytes()?;
Ok(FieldNormReader { data }) Ok(FieldNormReader::new(data))
}
fn new(data: OwnedBytes) -> Self {
ReaderImplEnum::FromData(data).into()
} }
/// Returns the number of documents in this segment. /// Returns the number of documents in this segment.
pub fn num_docs(&self) -> u32 { pub fn num_docs(&self) -> u32 {
self.data.len() as u32 match &self.0 {
ReaderImplEnum::FromData(data) => data.len() as u32,
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
}
} }
/// Returns the `fieldnorm` associated to a doc id. /// Returns the `fieldnorm` associated to a doc id.
@@ -87,14 +123,25 @@ impl FieldNormReader {
/// The fieldnorm is effectively decoded from the /// The fieldnorm is effectively decoded from the
/// `fieldnorm_id` by doing a simple table lookup. /// `fieldnorm_id` by doing a simple table lookup.
pub fn fieldnorm(&self, doc_id: DocId) -> u32 { pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
let fieldnorm_id = self.fieldnorm_id(doc_id); match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
id_to_fieldnorm(fieldnorm_id) id_to_fieldnorm(fieldnorm_id)
} }
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
}
}
/// Returns the `fieldnorm_id` associated to a document. /// Returns the `fieldnorm_id` associated to a document.
#[inline(always)] #[inline(always)]
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 { pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
self.data.as_slice()[doc_id as usize] match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
fieldnorm_id
}
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
}
} }
/// Converts a `fieldnorm_id` into a fieldnorm. /// Converts a `fieldnorm_id` into a fieldnorm.
@@ -118,9 +165,7 @@ impl FieldNormReader {
.map(FieldNormReader::fieldnorm_to_id) .map(FieldNormReader::fieldnorm_to_id)
.collect::<Vec<u8>>(); .collect::<Vec<u8>>();
let field_norms_data = OwnedBytes::new(field_norms_id); let field_norms_data = OwnedBytes::new(field_norms_id);
FieldNormReader { FieldNormReader::new(field_norms_data)
data: field_norms_data,
}
} }
} }
@@ -139,4 +184,20 @@ mod tests {
assert_eq!(fieldnorm_reader.fieldnorm(3), 4); assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064); assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
} }
#[test]
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
}
#[test]
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
}
} }

View File

@@ -1,45 +1,94 @@
use rand::thread_rng;
use std::collections::HashSet;
use crate::schema::*;
use crate::Index; use crate::Index;
use crate::Searcher; use crate::Searcher;
use crate::{doc, schema::*};
use rand::thread_rng;
use rand::Rng; use rand::Rng;
use std::collections::HashSet;
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) { fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20); assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len()); assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
}
Ok(())
} }
#[test] #[test]
#[ignore] #[ignore]
fn test_indexing() { fn test_functional_store() -> crate::Result<()> {
env_logger::init();
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64;
for iteration in 0.. {
let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
for _ in 0..num_docs {
doc_set.push(doc_id);
index_writer.add_document(doc!(id_field=>doc_id));
doc_id += 1;
}
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
println!("#{} - {}", iteration, searcher.segment_readers().len());
check_index_content(&searcher, &doc_set)?;
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_indexing() -> crate::Result<()> {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED); let id_field = schema_builder.add_u64_field("id", INDEXED);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED); let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let schema = schema_builder.build(); let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).unwrap(); let index = Index::create_from_tempdir(schema)?;
let reader = index.reader().unwrap(); let reader = index.reader()?;
let mut rng = thread_rng(); let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap(); let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
let mut committed_docs: HashSet<u64> = HashSet::new(); let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new(); let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..200 { for _ in 0..200 {
let random_val = rng.gen_range(0, 20); let random_val = rng.gen_range(0..20);
if random_val == 0 { if random_val == 0 {
index_writer.commit().expect("Commit failed"); index_writer.commit()?;
committed_docs.extend(&uncommitted_docs); committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear(); uncommitted_docs.clear();
reader.reload().unwrap(); reader.reload()?;
let searcher = reader.searcher(); let searcher = reader.searcher();
// check that everything is correct. // check that everything is correct.
check_index_content(&searcher, &committed_docs); check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
} else { } else {
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) { if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val); let doc_id_term = Term::from_field_u64(id_field, random_val);
@@ -55,4 +104,5 @@ fn test_indexing() {
} }
} }
} }
Ok(())
} }

View File

@@ -53,7 +53,7 @@ impl DeleteQueue {
return block; return block;
} }
let block = Arc::new(Block { let block = Arc::new(Block {
operations: Arc::default(), operations: Arc::new([]),
next: NextBlock::from(self.clone()), next: NextBlock::from(self.clone()),
}); });
wlock.last_block = Arc::downgrade(&block); wlock.last_block = Arc::downgrade(&block);
@@ -108,7 +108,7 @@ impl DeleteQueue {
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]); let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
let new_block = Arc::new(Block { let new_block = Arc::new(Block {
operations: Arc::new(delete_operations.into_boxed_slice()), operations: Arc::from(delete_operations.into_boxed_slice()),
next: NextBlock::from(self.clone()), next: NextBlock::from(self.clone()),
}); });
@@ -167,7 +167,7 @@ impl NextBlock {
} }
struct Block { struct Block {
operations: Arc<Box<[DeleteOperation]>>, operations: Arc<[DeleteOperation]>,
next: NextBlock, next: NextBlock,
} }

View File

@@ -449,7 +449,7 @@ impl IndexWriter {
} }
/// Accessor to the merge policy. /// Accessor to the merge policy.
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> { pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
self.segment_updater.get_merge_policy() self.segment_updater.get_merge_policy()
} }

View File

@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8; const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000; const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
/// `LogMergePolicy` tries tries to merge segments that have a similar number of /// `LogMergePolicy` tries to merge segments that have a similar number of
/// documents. /// documents.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LogMergePolicy { pub struct LogMergePolicy {

View File

@@ -503,7 +503,6 @@ impl IndexMerger {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000); let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new(); let mut delta_computer = DeltaComputer::new();
let mut field_term_streams = Vec::new();
let mut max_term_ords: Vec<TermOrdinal> = Vec::new(); let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
let field_readers: Vec<Arc<InvertedIndexReader>> = self let field_readers: Vec<Arc<InvertedIndexReader>> = self
@@ -512,9 +511,10 @@ impl IndexMerger {
.map(|reader| reader.inverted_index(indexed_field)) .map(|reader| reader.inverted_index(indexed_field))
.collect::<crate::Result<Vec<_>>>()?; .collect::<crate::Result<Vec<_>>>()?;
let mut field_term_streams = Vec::new();
for field_reader in &field_readers { for field_reader in &field_readers {
let terms = field_reader.terms(); let terms = field_reader.terms();
field_term_streams.push(terms.stream()); field_term_streams.push(terms.stream()?);
max_term_ords.push(terms.num_terms() as u64); max_term_ords.push(terms.num_terms() as u64);
} }

View File

@@ -9,6 +9,15 @@ pub struct DeleteOperation {
pub term: Term, pub term: Term,
} }
impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
}
}
}
/// Timestamped Add operation. /// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)] #[derive(Eq, PartialEq, Debug)]
pub struct AddOperation { pub struct AddOperation {

View File

@@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
index: Index, index: Index,
segment_manager: SegmentManager, segment_manager: SegmentManager,
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>, merge_policy: RwLock<Arc<dyn MergePolicy>>,
killed: AtomicBool, killed: AtomicBool,
stamper: Stamper, stamper: Stamper,
merge_operations: MergeOperationInventory, merge_operations: MergeOperationInventory,
@@ -193,19 +193,19 @@ impl SegmentUpdater {
merge_thread_pool, merge_thread_pool,
index, index,
segment_manager, segment_manager,
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))), merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
killed: AtomicBool::new(false), killed: AtomicBool::new(false),
stamper, stamper,
merge_operations: Default::default(), merge_operations: Default::default(),
}))) })))
} }
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> { pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
self.merge_policy.read().unwrap().clone() self.merge_policy.read().unwrap().clone()
} }
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) { pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
let arc_merge_policy = Arc::new(merge_policy); let arc_merge_policy = Arc::from(merge_policy);
*self.merge_policy.write().unwrap() = arc_merge_policy; *self.merge_policy.write().unwrap() = arc_merge_policy;
} }

View File

@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Index format version. /// Index format version.
const INDEX_FORMAT_VERSION: u32 = 2; const INDEX_FORMAT_VERSION: u32 = 3;
/// Structure version for the index. /// Structure version for the index.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -8,7 +8,7 @@ use std::io::{self, Write};
pub struct PositionSerializer<W: io::Write> { pub struct PositionSerializer<W: io::Write> {
bit_packer: BitPacker4x, bit_packer: BitPacker4x,
write_stream: CountingWriter<W>, write_stream: CountingWriter<W>,
write_skiplist: W, write_skip_index: W,
block: Vec<u32>, block: Vec<u32>,
buffer: Vec<u8>, buffer: Vec<u8>,
num_ints: u64, num_ints: u64,
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
} }
impl<W: io::Write> PositionSerializer<W> { impl<W: io::Write> PositionSerializer<W> {
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> { pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
PositionSerializer { PositionSerializer {
bit_packer: BitPacker4x::new(), bit_packer: BitPacker4x::new(),
write_stream: CountingWriter::wrap(write_stream), write_stream: CountingWriter::wrap(write_stream),
write_skiplist, write_skip_index,
block: Vec::with_capacity(128), block: Vec::with_capacity(128),
buffer: vec![0u8; 128 * 4], buffer: vec![0u8; 128 * 4],
num_ints: 0u64, num_ints: 0u64,
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
fn flush_block(&mut self) -> io::Result<()> { fn flush_block(&mut self) -> io::Result<()> {
let num_bits = self.bit_packer.num_bits(&self.block[..]); let num_bits = self.bit_packer.num_bits(&self.block[..]);
self.write_skiplist.write_all(&[num_bits])?; self.write_skip_index.write_all(&[num_bits])?;
let written_len = self let written_len = self
.bit_packer .bit_packer
.compress(&self.block[..], &mut self.buffer, num_bits); .compress(&self.block[..], &mut self.buffer, num_bits);
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
self.flush_block()?; self.flush_block()?;
} }
for &long_skip in &self.long_skips { for &long_skip in &self.long_skips {
long_skip.serialize(&mut self.write_skiplist)?; long_skip.serialize(&mut self.write_skip_index)?;
} }
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?; (self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
self.write_skiplist.flush()?; self.write_skip_index.flush()?;
self.write_stream.flush()?; self.write_stream.flush()?;
Ok(()) Ok(())
} }

View File

@@ -469,7 +469,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0); let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field).unwrap(); let inverted_index = segment_reader.inverted_index(int_field).unwrap();
let term = Term::from_field_u64(int_field, 0u64); let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap(); let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
inverted_index inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic) .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
.unwrap() .unwrap()
@@ -513,7 +513,7 @@ mod tests {
{ {
let term = Term::from_field_u64(int_field, 0u64); let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field)?; let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term).unwrap(); let term_info = inverted_index.get_term_info(&term)?.unwrap();
block_segments = inverted_index block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?; .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
} }
@@ -521,7 +521,7 @@ mod tests {
{ {
let term = Term::from_field_u64(int_field, 1u64); let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field)?; let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term).unwrap(); let term_info = inverted_index.get_term_info(&term)?.unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?; inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
} }
assert_eq!(block_segments.docs(), &[1, 3, 5]); assert_eq!(block_segments.docs(), &[1, 3, 5]);

View File

@@ -15,18 +15,14 @@ mod stacker;
mod term_info; mod term_info;
pub(crate) use self::block_search::BlockSearcher; pub(crate) use self::block_search::BlockSearcher;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub use self::postings::Postings;
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub use self::term_info::TermInfo;
pub use self::block_segment_postings::BlockSegmentPostings; pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::segment_postings::SegmentPostings; pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size; pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub(crate) type UnorderedTermId = u64; pub(crate) type UnorderedTermId = u64;
@@ -51,17 +47,14 @@ pub mod tests {
use crate::indexer::SegmentWriter; use crate::indexer::SegmentWriter;
use crate::merge_policy::NoMergePolicy; use crate::merge_policy::NoMergePolicy;
use crate::query::Scorer; use crate::query::Scorer;
use crate::schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
use crate::schema::{Field, TextOptions}; use crate::schema::{Field, TextOptions};
use crate::schema::{IndexRecordOption, TextFieldIndexing}; use crate::schema::{IndexRecordOption, TextFieldIndexing};
use crate::schema::{Schema, Term, INDEXED, TEXT};
use crate::tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN}; use crate::tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
use crate::DocId; use crate::DocId;
use crate::HasLen; use crate::HasLen;
use crate::Score; use crate::Score;
use once_cell::sync::Lazy; use std::{iter, mem};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::iter;
#[test] #[test]
pub fn test_position_write() -> crate::Result<()> { pub fn test_position_write() -> crate::Result<()> {
@@ -78,6 +71,7 @@ pub mod tests {
field_serializer.write_doc(doc_id, 4, &delta_positions)?; field_serializer.write_doc(doc_id, 4, &delta_positions)?;
} }
field_serializer.close_term()?; field_serializer.close_term()?;
mem::drop(field_serializer);
posting_serializer.close()?; posting_serializer.close()?;
let read = segment.open_read(SegmentComponent::POSITIONS)?; let read = segment.open_read(SegmentComponent::POSITIONS)?;
assert!(read.len() <= 140); assert!(read.len() <= 140);
@@ -186,7 +180,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?; let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1); assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![]; let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)); assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert_eq!(&bytes, b"hello"); assert_eq!(&bytes, b"hello");
} }
{ {
@@ -198,7 +192,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?; let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1); assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![]; let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)); assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert_eq!(&bytes[..], ok_token_text.as_bytes()); assert_eq!(&bytes[..], ok_token_text.as_bytes());
} }
Ok(()) Ok(())
@@ -491,53 +485,6 @@ pub mod tests {
Ok(()) Ok(())
} }
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
/// Wraps a given docset, and forward alls call but the /// Wraps a given docset, and forward alls call but the
/// `.skip_next(...)`. This is useful to test that a specialized /// `.skip_next(...)`. This is useful to test that a specialized
/// implementation of `.skip_next(...)` is consistent /// implementation of `.skip_next(...)` is consistent
@@ -602,15 +549,65 @@ pub mod tests {
#[cfg(all(test, feature = "unstable"))] #[cfg(all(test, feature = "unstable"))]
mod bench { mod bench {
use super::tests::*;
use crate::docset::TERMINATED; use crate::docset::TERMINATED;
use crate::query::Intersection; use crate::query::Intersection;
use crate::schema::IndexRecordOption; use crate::schema::IndexRecordOption;
use crate::schema::{Document, Field, Schema, Term, STRING};
use crate::tests; use crate::tests;
use crate::DocSet; use crate::DocSet;
use crate::Index;
use once_cell::sync::Lazy;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::{self, Bencher}; use test::{self, Bencher};
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
#[bench] #[bench]
fn bench_segment_postings(b: &mut Bencher) { fn bench_segment_postings(b: &mut Bencher) {
let reader = INDEX.reader().unwrap(); let reader = INDEX.reader().unwrap();
@@ -620,7 +617,9 @@ mod bench {
b.iter(|| { b.iter(|| {
let mut segment_postings = segment_reader let mut segment_postings = segment_reader
.inverted_index(TERM_A.field()) .inverted_index(TERM_A.field())
.read_postings(&*TERM_A, IndexRecordOption::Basic)? .unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap(); .unwrap();
while segment_postings.advance() != TERMINATED {} while segment_postings.advance() != TERMINATED {}
}); });
@@ -634,21 +633,25 @@ mod bench {
b.iter(|| { b.iter(|| {
let segment_postings_a = segment_reader let segment_postings_a = segment_reader
.inverted_index(TERM_A.field()) .inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic) .read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap() .unwrap()
.unwrap(); .unwrap();
let segment_postings_b = segment_reader let segment_postings_b = segment_reader
.inverted_index(TERM_B.field()) .inverted_index(TERM_B.field())
.unwrap()
.read_postings(&*TERM_B, IndexRecordOption::Basic) .read_postings(&*TERM_B, IndexRecordOption::Basic)
.unwrap() .unwrap()
.unwrap(); .unwrap();
let segment_postings_c = segment_reader let segment_postings_c = segment_reader
.inverted_index(TERM_C.field()) .inverted_index(TERM_C.field())
.unwrap()
.read_postings(&*TERM_C, IndexRecordOption::Basic) .read_postings(&*TERM_C, IndexRecordOption::Basic)
.unwrap() .unwrap()
.unwrap(); .unwrap();
let segment_postings_d = segment_reader let segment_postings_d = segment_reader
.inverted_index(TERM_D.field()) .inverted_index(TERM_D.field())
.unwrap()
.read_postings(&*TERM_D, IndexRecordOption::Basic) .read_postings(&*TERM_D, IndexRecordOption::Basic)
.unwrap() .unwrap()
.unwrap(); .unwrap();
@@ -670,6 +673,7 @@ mod bench {
let mut segment_postings = segment_reader let mut segment_postings = segment_reader
.inverted_index(TERM_A.field()) .inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic) .read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap() .unwrap()
.unwrap(); .unwrap();
@@ -687,7 +691,9 @@ mod bench {
b.iter(|| { b.iter(|| {
let mut segment_postings = segment_reader let mut segment_postings = segment_reader
.inverted_index(TERM_A.field()) .inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic) .read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap(); .unwrap();
for doc in &existing_docs { for doc in &existing_docs {
if segment_postings.seek(*doc) == TERMINATED { if segment_postings.seek(*doc) == TERMINATED {
@@ -726,7 +732,9 @@ mod bench {
let n: u32 = test::black_box(17); let n: u32 = test::black_box(17);
let mut segment_postings = segment_reader let mut segment_postings = segment_reader
.inverted_index(TERM_A.field()) .inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic) .read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap(); .unwrap();
let mut s = 0u32; let mut s = 0u32;
while segment_postings.doc() != TERMINATED { while segment_postings.doc() != TERMINATED {

View File

@@ -177,14 +177,16 @@ impl<'a> FieldSerializer<'a> {
} }
fn current_term_info(&self) -> TermInfo { fn current_term_info(&self) -> TermInfo {
let positions_idx = self let positions_idx =
.positions_serializer_opt if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
.as_ref() positions_serializer.positions_idx()
.map(PositionSerializer::positions_idx) } else {
.unwrap_or(0u64); 0u64
};
TermInfo { TermInfo {
doc_freq: 0, doc_freq: 0,
postings_offset: self.postings_serializer.addr(), postings_start_offset: self.postings_serializer.addr(),
postings_stop_offset: 0u64,
positions_idx, positions_idx,
} }
} }
@@ -238,10 +240,11 @@ impl<'a> FieldSerializer<'a> {
/// using `VInt` encoding. /// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> { pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open { if self.term_open {
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer self.postings_serializer
.close_term(self.current_term_info.doc_freq)?; .close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_stop_offset = self.postings_serializer.addr();
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false; self.term_open = false;
} }
Ok(()) Ok(())

View File

@@ -1,32 +1,46 @@
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable}; use std::convert::TryInto;
use crate::directory::OwnedBytes; use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE}; use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::BM25Weight; use crate::query::BM25Weight;
use crate::schema::IndexRecordOption; use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED}; use crate::{DocId, Score, TERMINATED};
#[inline(always)]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline(always)]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline(always)]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline(always)]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer { pub struct SkipSerializer {
buffer: Vec<u8>, buffer: Vec<u8>,
prev_doc: DocId,
} }
impl SkipSerializer { impl SkipSerializer {
pub fn new() -> SkipSerializer { pub fn new() -> SkipSerializer {
SkipSerializer { SkipSerializer { buffer: Vec::new() }
buffer: Vec::new(),
prev_doc: 0u32,
}
} }
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) { pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
assert!( write_u32(last_doc, &mut self.buffer);
last_doc > self.prev_doc,
"write_doc(...) called with non-increasing doc ids. \
Did you forget to call clear maybe?"
);
let delta_doc = last_doc - self.prev_doc;
self.prev_doc = last_doc;
delta_doc.serialize(&mut self.buffer).unwrap();
self.buffer.push(doc_num_bits); self.buffer.push(doc_num_bits);
} }
@@ -35,16 +49,13 @@ impl SkipSerializer {
} }
pub fn write_total_term_freq(&mut self, tf_sum: u32) { pub fn write_total_term_freq(&mut self, tf_sum: u32) {
tf_sum write_u32(tf_sum, &mut self.buffer);
.serialize(&mut self.buffer)
.expect("Should never fail");
} }
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) { pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
self.buffer.push(fieldnorm_id); let block_wand_tf = encode_block_wand_max_tf(term_freq);
let mut buf = [0u8; 8]; self.buffer
let bytes = serialize_vint_u32(term_freq, &mut buf); .extend_from_slice(&[fieldnorm_id, block_wand_tf]);
self.buffer.extend_from_slice(bytes);
} }
pub fn data(&self) -> &[u8] { pub fn data(&self) -> &[u8] {
@@ -52,7 +63,6 @@ impl SkipSerializer {
} }
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.prev_doc = 0u32;
self.buffer.clear(); self.buffer.clear();
} }
} }
@@ -159,18 +169,13 @@ impl SkipReader {
} }
fn read_block_info(&mut self) { fn read_block_info(&mut self) {
let doc_delta = {
let bytes = self.owned_read.as_slice(); let bytes = self.owned_read.as_slice();
let mut buf = [0; 4]; let advance_len: usize;
buf.copy_from_slice(&bytes[..4]); self.last_doc_in_block = read_u32(bytes);
u32::from_le_bytes(buf) let doc_num_bits = bytes[4];
};
self.last_doc_in_block += doc_delta as DocId;
let doc_num_bits = self.owned_read.as_slice()[4];
match self.skip_info { match self.skip_info {
IndexRecordOption::Basic => { IndexRecordOption::Basic => {
self.owned_read.advance(5); advance_len = 5;
self.block_info = BlockInfo::BitPacked { self.block_info = BlockInfo::BitPacked {
doc_num_bits, doc_num_bits,
tf_num_bits: 0, tf_num_bits: 0,
@@ -180,11 +185,10 @@ impl SkipReader {
}; };
} }
IndexRecordOption::WithFreqs => { IndexRecordOption::WithFreqs => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5]; let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6]; let block_wand_fieldnorm_id = bytes[6];
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]); let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
self.owned_read.advance(7 + num_bytes); advance_len = 8;
self.block_info = BlockInfo::BitPacked { self.block_info = BlockInfo::BitPacked {
doc_num_bits, doc_num_bits,
tf_num_bits, tf_num_bits,
@@ -194,16 +198,11 @@ impl SkipReader {
}; };
} }
IndexRecordOption::WithFreqsAndPositions => { IndexRecordOption::WithFreqsAndPositions => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5]; let tf_num_bits = bytes[5];
let tf_sum = { let tf_sum = read_u32(&bytes[6..10]);
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[6..10]);
u32::from_le_bytes(buf)
};
let block_wand_fieldnorm_id = bytes[10]; let block_wand_fieldnorm_id = bytes[10];
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]); let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
self.owned_read.advance(11 + num_bytes); advance_len = 12;
self.block_info = BlockInfo::BitPacked { self.block_info = BlockInfo::BitPacked {
doc_num_bits, doc_num_bits,
tf_num_bits, tf_num_bits,
@@ -213,6 +212,7 @@ impl SkipReader {
}; };
} }
} }
self.owned_read.advance(advance_len);
} }
pub fn block_info(&self) -> BlockInfo { pub fn block_info(&self) -> BlockInfo {
@@ -274,6 +274,24 @@ mod tests {
use crate::directory::OwnedBytes; use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE; use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test] #[test]
fn test_skip_with_freq() { fn test_skip_with_freq() {
let buf = { let buf = {

View File

@@ -7,35 +7,50 @@ use std::io;
pub struct TermInfo { pub struct TermInfo {
/// Number of documents in the segment containing the term /// Number of documents in the segment containing the term
pub doc_freq: u32, pub doc_freq: u32,
/// Start offset within the postings (`.idx`) file. /// Start offset of the posting list within the postings (`.idx`) file.
pub postings_offset: u64, pub postings_start_offset: u64,
/// Stop offset of the posting list within the postings (`.idx`) file.
/// The byte range is `[start_offset..stop_offset)`.
pub postings_stop_offset: u64,
/// Start offset of the first block within the position (`.pos`) file. /// Start offset of the first block within the position (`.pos`) file.
pub positions_idx: u64, pub positions_idx: u64,
} }
impl TermInfo {
pub(crate) fn posting_num_bytes(&self) -> u32 {
let num_bytes = self.postings_stop_offset - self.postings_start_offset;
assert!(num_bytes <= std::u32::MAX as u64);
num_bytes as u32
}
}
impl FixedSize for TermInfo { impl FixedSize for TermInfo {
/// Size required for the binary serialization of a `TermInfo` object. /// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and /// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed. /// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked. /// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES; const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
} }
impl BinarySerializable for TermInfo { impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> { fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?; self.doc_freq.serialize(writer)?;
self.postings_offset.serialize(writer)?; self.postings_start_offset.serialize(writer)?;
self.posting_num_bytes().serialize(writer)?;
self.positions_idx.serialize(writer)?; self.positions_idx.serialize(writer)?;
Ok(()) Ok(())
} }
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> { fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?; let doc_freq = u32::deserialize(reader)?;
let postings_offset = u64::deserialize(reader)?; let postings_start_offset = u64::deserialize(reader)?;
let postings_num_bytes = u32::deserialize(reader)?;
let postings_stop_offset = postings_start_offset + u64::from(postings_num_bytes);
let positions_idx = u64::deserialize(reader)?; let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo { Ok(TermInfo {
doc_freq, doc_freq,
postings_offset, postings_start_offset,
postings_stop_offset,
positions_idx, positions_idx,
}) })
} }

View File

@@ -7,6 +7,7 @@ use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer}; use crate::termdict::{TermDictionary, TermStreamer};
use crate::TantivyError; use crate::TantivyError;
use crate::{DocId, Score}; use crate::{DocId, Score};
use std::io;
use std::sync::Arc; use std::sync::Arc;
use tantivy_fst::Automaton; use tantivy_fst::Automaton;
@@ -19,6 +20,7 @@ pub struct AutomatonWeight<A> {
impl<A> AutomatonWeight<A> impl<A> AutomatonWeight<A>
where where
A: Automaton + Send + Sync + 'static, A: Automaton + Send + Sync + 'static,
A::State: Clone,
{ {
/// Create a new AutomationWeight /// Create a new AutomationWeight
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> { pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
@@ -28,7 +30,10 @@ where
} }
} }
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> { fn automaton_stream<'a>(
&'a self,
term_dict: &'a TermDictionary,
) -> io::Result<TermStreamer<'a, &'a A>> {
let automaton: &A = &*self.automaton; let automaton: &A = &*self.automaton;
let term_stream_builder = term_dict.search(automaton); let term_stream_builder = term_dict.search(automaton);
term_stream_builder.into_stream() term_stream_builder.into_stream()
@@ -38,13 +43,14 @@ where
impl<A> Weight for AutomatonWeight<A> impl<A> Weight for AutomatonWeight<A>
where where
A: Automaton + Send + Sync + 'static, A: Automaton + Send + Sync + 'static,
A::State: Clone,
{ {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> { fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let max_doc = reader.max_doc(); let max_doc = reader.max_doc();
let mut doc_bitset = BitSet::with_max_value(max_doc); let mut doc_bitset = BitSet::with_max_value(max_doc);
let inverted_index = reader.inverted_index(self.field)?; let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms(); let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict); let mut term_stream = self.automaton_stream(term_dict)?;
while term_stream.advance() { while term_stream.advance() {
let term_info = term_stream.value(); let term_info = term_stream.value();
let mut block_segment_postings = inverted_index let mut block_segment_postings = inverted_index
@@ -98,6 +104,7 @@ mod tests {
index index
} }
#[derive(Clone, Copy)]
enum State { enum State {
Start, Start,
NotMatching, NotMatching,

View File

@@ -106,7 +106,7 @@ impl BM25Weight {
BM25Weight::new(idf_explain, avg_fieldnorm) BM25Weight::new(idf_explain, avg_fieldnorm)
} }
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight { pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
let weight = idf_explain.value() * (1.0 + K1); let weight = idf_explain.value() * (1.0 + K1);
BM25Weight { BM25Weight {
idf_explain, idf_explain,

View File

@@ -268,7 +268,7 @@ mod tests {
} }
fn nearly_equals(left: Score, right: Score) -> bool { fn nearly_equals(left: Score, right: Score) -> bool {
(left - right).abs() < 0.000001 * (left + right).abs() (left - right).abs() < 0.0001 * (left + right).abs()
} }
fn compute_checkpoints_for_each_pruning( fn compute_checkpoints_for_each_pruning(
@@ -424,9 +424,116 @@ mod tests {
} }
} }
#[test]
fn test_fn_reproduce_proptest() {
let postings_lists = &[
vec![
(0, 1),
(1, 1),
(2, 1),
(3, 1),
(4, 1),
(6, 1),
(7, 7),
(8, 1),
(10, 1),
(12, 1),
(13, 1),
(14, 1),
(15, 1),
(16, 1),
(19, 1),
(20, 1),
(21, 1),
(22, 1),
(24, 1),
(25, 1),
(26, 1),
(28, 1),
(30, 1),
(31, 1),
(33, 1),
(34, 1),
(35, 1),
(36, 95),
(37, 1),
(39, 1),
(41, 1),
(44, 1),
(46, 1),
],
vec![
(0, 5),
(2, 1),
(4, 1),
(5, 84),
(6, 47),
(7, 26),
(8, 50),
(9, 34),
(11, 73),
(12, 11),
(13, 51),
(14, 45),
(15, 18),
(18, 60),
(19, 80),
(20, 63),
(23, 79),
(24, 69),
(26, 35),
(28, 82),
(29, 19),
(30, 2),
(31, 7),
(33, 40),
(34, 1),
(35, 33),
(36, 27),
(37, 24),
(38, 65),
(39, 32),
(40, 85),
(41, 1),
(42, 69),
(43, 11),
(45, 45),
(47, 97),
],
vec![
(2, 1),
(4, 1),
(7, 94),
(8, 1),
(9, 1),
(10, 1),
(12, 1),
(15, 1),
(22, 1),
(23, 1),
(26, 1),
(27, 1),
(32, 1),
(33, 1),
(34, 1),
(36, 96),
(39, 1),
(41, 1),
],
];
let fieldnorms = &[
685, 239, 780, 564, 664, 827, 5, 56, 930, 887, 263, 665, 167, 127, 120, 919, 292, 92,
489, 734, 814, 724, 700, 304, 128, 779, 311, 877, 774, 15, 866, 368, 894, 371, 982,
502, 507, 669, 680, 76, 594, 626, 578, 331, 170, 639, 665, 186,
][..];
test_block_wand_aux(postings_lists, fieldnorms);
}
proptest! { proptest! {
#![proptest_config(ProptestConfig::with_cases(500))] #![proptest_config(ProptestConfig::with_cases(500))]
#[ignore]
#[test] #[test]
#[ignore]
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) { fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]); test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
} }

View File

@@ -310,7 +310,7 @@ mod tests {
)); ));
let query = BooleanQuery::from(vec![(Occur::Should, term_a), (Occur::Should, term_b)]); let query = BooleanQuery::from(vec![(Occur::Should, term_a), (Occur::Should, term_b)]);
let explanation = query.explain(&searcher, DocAddress(0, 0u32))?; let explanation = query.explain(&searcher, DocAddress(0, 0u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472f32); assert_nearly_equals!(explanation.value(), 0.6931472);
Ok(()) Ok(())
} }
} }

View File

@@ -11,6 +11,7 @@ use crate::schema::{Field, IndexRecordOption, Term};
use crate::termdict::{TermDictionary, TermStreamer}; use crate::termdict::{TermDictionary, TermStreamer};
use crate::{DocId, Score}; use crate::{DocId, Score};
use std::collections::Bound; use std::collections::Bound;
use std::io;
use std::ops::Range; use std::ops::Range;
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>( fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
@@ -274,7 +275,7 @@ pub struct RangeWeight {
} }
impl RangeWeight { impl RangeWeight {
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> { fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
use std::collections::Bound::*; use std::collections::Bound::*;
let mut term_stream_builder = term_dict.range(); let mut term_stream_builder = term_dict.range();
term_stream_builder = match self.left_bound { term_stream_builder = match self.left_bound {
@@ -298,7 +299,7 @@ impl Weight for RangeWeight {
let inverted_index = reader.inverted_index(self.field)?; let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms(); let term_dict = inverted_index.terms();
let mut term_range = self.term_range(term_dict); let mut term_range = self.term_range(term_dict)?;
while term_range.advance() { while term_range.advance() {
let term_info = term_range.value(); let term_info = term_range.value();
let mut block_segment_postings = inverted_index let mut block_segment_postings = inverted_index

View File

@@ -12,7 +12,7 @@ use std::marker::PhantomData;
/// This is useful for queries like `+somethingrequired somethingoptional`. /// This is useful for queries like `+somethingrequired somethingoptional`.
/// ///
/// Note that `somethingoptional` has no impact on the `DocSet`. /// Note that `somethingoptional` has no impact on the `DocSet`.
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> { pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
req_scorer: TReqScorer, req_scorer: TReqScorer,
opt_scorer: TOptScorer, opt_scorer: TOptScorer,
score_cache: Option<Score>, score_cache: Option<Score>,
@@ -23,6 +23,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
where where
TOptScorer: DocSet, TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{ {
/// Creates a new `RequiredOptionalScorer`. /// Creates a new `RequiredOptionalScorer`.
pub fn new( pub fn new(
@@ -43,6 +44,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
where where
TReqScorer: DocSet, TReqScorer: DocSet,
TOptScorer: DocSet, TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{ {
fn advance(&mut self) -> DocId { fn advance(&mut self) -> DocId {
self.score_cache = None; self.score_cache = None;

View File

@@ -3,7 +3,7 @@ use crate::Score;
/// The `ScoreCombiner` trait defines how to compute /// The `ScoreCombiner` trait defines how to compute
/// an overall score given a list of scores. /// an overall score given a list of scores.
pub trait ScoreCombiner: Default + Clone + Copy + 'static { pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
/// Aggregates the score combiner with the given scorer. /// Aggregates the score combiner with the given scorer.
/// ///
/// The `ScoreCombiner` may decide to call `.scorer.score()` /// The `ScoreCombiner` may decide to call `.scorer.score()`

View File

@@ -197,7 +197,7 @@ mod tests {
let searcher = index.reader()?.searcher(); let searcher = index.reader()?.searcher();
{ {
let explanation = term_query.explain(&searcher, DocAddress(0u32, 1u32))?; let explanation = term_query.explain(&searcher, DocAddress(0u32, 1u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472f32); assert_nearly_equals!(explanation.value(), 0.6931472);
} }
{ {
let explanation_err = term_query.explain(&searcher, DocAddress(0u32, 0u32)); let explanation_err = term_query.explain(&searcher, DocAddress(0u32, 0u32));

View File

@@ -1,7 +1,7 @@
use super::term_weight::TermWeight; use super::term_weight::TermWeight;
use crate::query::bm25::BM25Weight; use crate::query::bm25::BM25Weight;
use crate::query::Query;
use crate::query::Weight; use crate::query::Weight;
use crate::query::{Explanation, Query};
use crate::schema::IndexRecordOption; use crate::schema::IndexRecordOption;
use crate::Searcher; use crate::Searcher;
use crate::Term; use crate::Term;
@@ -93,7 +93,20 @@ impl TermQuery {
scoring_enabled: bool, scoring_enabled: bool,
) -> crate::Result<TermWeight> { ) -> crate::Result<TermWeight> {
let term = self.term.clone(); let term = self.term.clone();
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?; let field_entry = searcher.schema().get_field_entry(term.field());
if !field_entry.is_indexed() {
return Err(crate::TantivyError::SchemaError(format!(
"Field {:?} is not indexed",
field_entry.name()
)));
}
let bm25_weight;
if scoring_enabled {
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
} else {
bm25_weight =
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
}
let index_record_option = if scoring_enabled { let index_record_option = if scoring_enabled {
self.index_record_option self.index_record_option
} else { } else {
@@ -103,6 +116,7 @@ impl TermQuery {
self.term.clone(), self.term.clone(),
index_record_option, index_record_option,
bm25_weight, bm25_weight,
scoring_enabled,
)) ))
} }
} }

View File

@@ -302,7 +302,7 @@ mod tests {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
writer.set_merge_policy(Box::new(NoMergePolicy)); writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 { for _ in 0..3_000 {
let term_freq = rng.gen_range(1, 10000); let term_freq = rng.gen_range(1..10000);
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect(); let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
let text = words.join(" "); let text = words.join(" ");
writer.add_document(doc!(text_field=>text)); writer.add_document(doc!(text_field=>text));

View File

@@ -1,6 +1,7 @@
use super::term_scorer::TermScorer; use super::term_scorer::TermScorer;
use crate::core::SegmentReader; use crate::core::SegmentReader;
use crate::docset::DocSet; use crate::docset::DocSet;
use crate::fieldnorm::FieldNormReader;
use crate::postings::SegmentPostings; use crate::postings::SegmentPostings;
use crate::query::bm25::BM25Weight; use crate::query::bm25::BM25Weight;
use crate::query::explanation::does_not_match; use crate::query::explanation::does_not_match;
@@ -15,6 +16,7 @@ pub struct TermWeight {
term: Term, term: Term,
index_record_option: IndexRecordOption, index_record_option: IndexRecordOption,
similarity_weight: BM25Weight, similarity_weight: BM25Weight,
scoring_enabled: bool,
} }
impl Weight for TermWeight { impl Weight for TermWeight {
@@ -43,7 +45,7 @@ impl Weight for TermWeight {
} else { } else {
let field = self.term.field(); let field = self.term.field();
let inv_index = reader.inverted_index(field)?; let inv_index = reader.inverted_index(field)?;
let term_info = inv_index.get_term_info(&self.term); let term_info = inv_index.get_term_info(&self.term)?;
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0)) Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
} }
} }
@@ -87,11 +89,13 @@ impl TermWeight {
term: Term, term: Term,
index_record_option: IndexRecordOption, index_record_option: IndexRecordOption,
similarity_weight: BM25Weight, similarity_weight: BM25Weight,
scoring_enabled: bool,
) -> TermWeight { ) -> TermWeight {
TermWeight { TermWeight {
term, term,
index_record_option, index_record_option,
similarity_weight, similarity_weight,
scoring_enabled,
} }
} }
@@ -102,7 +106,11 @@ impl TermWeight {
) -> crate::Result<TermScorer> { ) -> crate::Result<TermScorer> {
let field = self.term.field(); let field = self.term.field();
let inverted_index = reader.inverted_index(field)?; let inverted_index = reader.inverted_index(field)?;
let fieldnorm_reader = reader.get_fieldnorms_reader(field)?; let fieldnorm_reader = if self.scoring_enabled {
reader.get_fieldnorms_reader(field)?
} else {
FieldNormReader::constant(reader.max_doc(), 1)
};
let similarity_weight = self.similarity_weight.boost_by(boost); let similarity_weight = self.similarity_weight.boost_by(boost);
let postings_opt: Option<SegmentPostings> = let postings_opt: Option<SegmentPostings> =
inverted_index.read_postings(&self.term, self.index_record_option)?; inverted_index.read_postings(&self.term, self.index_record_option)?;

View File

@@ -3,9 +3,9 @@ mod pool;
pub use self::pool::LeasedItem; pub use self::pool::LeasedItem;
use self::pool::Pool; use self::pool::Pool;
use crate::core::Segment; use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle; use crate::directory::WatchHandle;
use crate::directory::META_LOCK; use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index; use crate::Index;
use crate::Searcher; use crate::Searcher;
use crate::SegmentReader; use crate::SegmentReader;
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
let watch_handle = inner_reader_arc let watch_handle = inner_reader_arc
.index .index
.directory() .directory()
.watch(Box::new(callback))?; .watch(WatchCallback::new(callback))?;
watch_handle_opt = Some(watch_handle); watch_handle_opt = Some(watch_handle);
} }
} }

View File

@@ -233,6 +233,7 @@ mod tests {
assert_eq!(Facet::root(), Facet::from("/")); assert_eq!(Facet::root(), Facet::from("/"));
assert_eq!(format!("{}", Facet::root()), "/"); assert_eq!(format!("{}", Facet::root()), "/");
assert!(Facet::root().is_root()); assert!(Facet::root().is_root());
assert_eq!(Facet::root().encoded_str(), "");
} }
#[test] #[test]

View File

@@ -1,5 +1,5 @@
use crate::schema::Value; use crate::schema::Value;
use serde::Serialize; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
/// Internal representation of a document used for JSON /// Internal representation of a document used for JSON
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
/// A `NamedFieldDocument` is a simple representation of a document /// A `NamedFieldDocument` is a simple representation of a document
/// as a `BTreeMap<String, Vec<Value>>`. /// as a `BTreeMap<String, Vec<Value>>`.
/// ///
#[derive(Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>); pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);

View File

@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
/// Name of the compression scheme used in the doc store. /// Name of the compression scheme used in the doc store.
/// ///
/// This name is appended to the version string of tantivy. /// This name is appended to the version string of tantivy.
pub const COMPRESSION: &'static str = "lz4"; pub const COMPRESSION: &str = "lz4";
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> { pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear(); compressed.clear();

168
src/store/index/block.rs Normal file
View File

@@ -0,0 +1,168 @@
use crate::common::VInt;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use crate::DocId;
use std::io;
/// Represents a block of checkpoints.
///
/// The DocStore index checkpoints are organized into block
/// for code-readability and compression purpose.
///
/// A block can be of any size.
pub struct CheckpointBlock {
pub checkpoints: Vec<Checkpoint>,
}
impl Default for CheckpointBlock {
fn default() -> CheckpointBlock {
CheckpointBlock {
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
}
}
}
impl CheckpointBlock {
/// If non-empty returns [start_doc, end_doc)
/// for the overall block.
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
let start_doc_opt = self
.checkpoints
.first()
.cloned()
.map(|checkpoint| checkpoint.start_doc);
let end_doc_opt = self
.checkpoints
.last()
.cloned()
.map(|checkpoint| checkpoint.end_doc);
match (start_doc_opt, end_doc_opt) {
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
_ => None,
}
}
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}
/// Returns the number of checkpoints in the block.
pub fn len(&self) -> usize {
self.checkpoints.len()
}
pub fn get(&self, idx: usize) -> Checkpoint {
self.checkpoints[idx]
}
pub fn clear(&mut self) {
self.checkpoints.clear();
}
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
if self.checkpoints.is_empty() {
return;
}
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
for checkpoint in &self.checkpoints {
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
VInt(delta_doc as u64).serialize_into_vec(buffer);
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
}
}
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
if data.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
}
self.checkpoints.clear();
let len = VInt::deserialize_u64(data)? as usize;
if len == 0 {
return Ok(());
}
let mut doc = VInt::deserialize_u64(data)? as DocId;
let mut start_offset = VInt::deserialize_u64(data)?;
for _ in 0..len {
let num_docs = VInt::deserialize_u64(data)? as DocId;
let block_num_bytes = VInt::deserialize_u64(data)?;
self.checkpoints.push(Checkpoint {
start_doc: doc,
end_doc: doc + num_docs,
start_offset,
end_offset: start_offset + block_num_bytes,
});
doc += num_docs;
start_offset += block_num_bytes;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io;
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
let mut block = CheckpointBlock::default();
for &checkpoint in checkpoints {
block.push(checkpoint);
}
let mut buffer = Vec::new();
block.serialize(&mut buffer);
let mut block_deser = CheckpointBlock::default();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 1,
start_offset: 2,
end_offset: 3,
};
block_deser.push(checkpoint); // < check that value is erased before deser
let mut data = &buffer[..];
block_deser.deserialize(&mut data)?;
assert!(data.is_empty());
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
Ok(())
}
#[test]
fn test_block_serialize_empty() -> io::Result<()> {
test_aux_ser_deser(&[])
}
#[test]
fn test_block_serialize_simple() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
start_doc: 10,
end_doc: 12,
start_offset: 100,
end_offset: 120,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
let mut checkpoints = vec![];
let mut start_doc = 0;
for i in 0..10 {
let end_doc = (i * i) as DocId;
checkpoints.push(Checkpoint {
start_doc,
end_doc,
start_offset: offsets[i],
end_offset: offsets[i + 1],
});
start_doc = end_doc;
}
test_aux_ser_deser(&checkpoints)
}
}

274
src/store/index/mod.rs Normal file
View File

@@ -0,0 +1,274 @@
const CHECKPOINT_PERIOD: usize = 2;
use std::fmt;
mod block;
mod skip_index;
mod skip_index_builder;
use crate::DocId;
pub use self::skip_index::SkipIndex;
pub use self::skip_index_builder::SkipIndexBuilder;
/// A checkpoint contains meta-information about
/// a block. Either a block of documents, or another block
/// of checkpoints.
///
/// All of the intervals here defined are semi-open.
/// The checkpoint describes that the block within the bytes
/// `[start_offset..end_offset)` spans over the docs
/// `[start_doc..end_doc)`.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Checkpoint {
pub start_doc: DocId,
pub end_doc: DocId,
pub start_offset: u64,
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) &&
(self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"(doc=[{}..{}), bytes=[{}..{}))",
self.start_doc, self.end_doc, self.start_offset, self.end_offset
)
}
}
#[cfg(test)]
mod tests {
use std::{io, iter};
use futures::executor::block_on;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::indexer::NoMergePolicy;
use crate::schema::{SchemaBuilder, STORED, STRING};
use crate::store::index::Checkpoint;
use crate::{DocAddress, DocId, Index, Term};
use super::{SkipIndex, SkipIndexBuilder};
#[test]
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
}
#[test]
fn test_skip_index_single_el() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 2,
start_offset: 0,
end_offset: 3,
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
Ok(())
}
#[test]
fn test_skip_index() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints = vec![
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
start_doc: 3,
end_doc: 4,
start_offset: 9,
end_offset: 25,
},
Checkpoint {
start_doc: 4,
end_doc: 6,
start_offset: 25,
end_offset: 49,
},
Checkpoint {
start_doc: 6,
end_doc: 8,
start_offset: 49,
end_offset: 81,
},
Checkpoint {
start_doc: 8,
end_doc: 10,
start_offset: 81,
end_offset: 100,
},
];
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
for &checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint);
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
);
Ok(())
}
fn offset_test(doc: DocId) -> u64 {
(doc as u64) * (doc as u64)
}
#[test]
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let text = schema_builder.add_text_field("text", STORED | STRING);
let body = schema_builder.add_text_field("body", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
.take(1_000)
.collect();
for _ in 0..20 {
index_writer.add_document(doc!(body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.add_document(doc!(text=>"testb"));
for _ in 0..10 {
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.delete_term(Term::from_field_text(text, "testb"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 30);
for i in 0..searcher.num_docs() as u32 {
let _doc = searcher.doc(DocAddress(0u32, i))?;
}
Ok(())
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints: Vec<Checkpoint> = (0..1000)
.map(|i| Checkpoint {
start_doc: i,
end_doc: i + 1,
start_offset: offset_test(i),
end_offset: offset_test(i + 1),
})
.collect();
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in &checkpoints {
skip_index_builder.insert(*checkpoint);
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
*val = new_val;
}
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
)
.prop_map(|(docs, offsets)| {
(0..docs.len() - 1)
.map(move |i| Checkpoint {
start_doc: docs[i] as DocId,
end_doc: docs[i + 1] as DocId,
start_offset: offsets[i],
end_offset: offsets[i + 1],
})
.collect::<Vec<Checkpoint>>()
})
})
.boxed()
}
fn seek_manual<I: Iterator<Item = Checkpoint>>(
checkpoints: I,
target: DocId,
) -> Option<Checkpoint> {
checkpoints
.into_iter()
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
if let Some(last_checkpoint) = checkpoints.last() {
for doc in 0u32..last_checkpoint.end_doc {
let expected = seek_manual(skip_index.checkpoints(), doc);
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
}
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
}
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in checkpoints.iter().cloned() {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);
}
}
}

View File

@@ -0,0 +1,110 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::OwnedBytes;
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
pub struct LayerCursor<'a> {
remaining: &'a [u8],
block: CheckpointBlock,
cursor: usize,
}
impl<'a> Iterator for LayerCursor<'a> {
type Item = Checkpoint;
fn next(&mut self) -> Option<Checkpoint> {
if self.cursor == self.block.len() {
if self.remaining.is_empty() {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if block_mut.deserialize(remaining_mut).is_err() {
return None;
}
self.cursor = 0;
}
let res = Some(self.block.get(self.cursor));
self.cursor += 1;
res
}
}
struct Layer {
data: OwnedBytes,
}
impl Layer {
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.cursor_at_offset(0u64)
}
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
let data = &self.data.as_slice();
LayerCursor {
remaining: &data[start_offset as usize..],
block: CheckpointBlock::default(),
cursor: 0,
}
}
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.find(|checkpoint| checkpoint.end_doc > target)
}
}
pub struct SkipIndex {
layers: Vec<Layer>,
}
impl SkipIndex {
pub fn open(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
let layer = Layer {
data: data.slice(start_offset as usize, end_offset as usize),
};
layers.push(layer);
start_offset = end_offset;
}
SkipIndex { layers }
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
.into_iter()
.flat_map(|layer| layer.cursor())
}
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
let first_layer_len = self
.layers
.first()
.map(|layer| layer.data.len() as u64)
.unwrap_or(0u64);
let mut cur_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 1u32,
start_offset: 0u64,
end_offset: first_layer_len,
};
for layer in &self.layers {
if let Some(checkpoint) =
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
{
cur_checkpoint = checkpoint;
} else {
return None;
}
}
Some(cur_checkpoint)
}
}

View File

@@ -0,0 +1,117 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
struct LayerBuilder {
buffer: Vec<u8>,
pub block: CheckpointBlock,
}
impl LayerBuilder {
fn finish(self) -> Vec<u8> {
self.buffer
}
fn new() -> LayerBuilder {
LayerBuilder {
buffer: Vec::new(),
block: CheckpointBlock::default(),
}
}
/// Serializes the block, and return a checkpoint representing
/// the entire block.
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Some(Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
})
} else {
None
}
}
fn push(&mut self, checkpoint: Checkpoint) {
self.block.push(checkpoint);
}
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
if emit_skip_info {
self.flush_block()
} else {
None
}
}
}
pub struct SkipIndexBuilder {
layers: Vec<LayerBuilder>,
}
impl SkipIndexBuilder {
pub fn new() -> SkipIndexBuilder {
SkipIndexBuilder { layers: Vec::new() }
}
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
if layer_id == self.layers.len() {
let layer_builder = LayerBuilder::new();
self.layers.push(layer_builder);
}
&mut self.layers[layer_id]
}
pub fn insert(&mut self, checkpoint: Checkpoint) {
let mut skip_pointer = Some(checkpoint);
for layer_id in 0.. {
if let Some(checkpoint) = skip_pointer {
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
} else {
break;
}
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
skip_layer.push(checkpoint);
}
last_pointer = skip_layer.flush_block();
}
let layer_buffers: Vec<Vec<u8>> = self
.layers
.into_iter()
.rev()
.map(|layer| layer.finish())
.collect();
let mut layer_offset = 0;
let mut layer_sizes = Vec::new();
for layer_buffer in &layer_buffers {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
Ok(())
}
}

View File

@@ -33,8 +33,8 @@ and should rely on either
!*/ !*/
mod index;
mod reader; mod reader;
mod skiplist;
mod writer; mod writer;
pub use self::reader::StoreReader; pub use self::reader::StoreReader;
pub use self::writer::StoreWriter; pub use self::writer::StoreWriter;

View File

@@ -1,69 +1,91 @@
use super::decompress; use super::decompress;
use super::skiplist::SkipList; use super::index::SkipIndex;
use crate::common::VInt; use crate::common::VInt;
use crate::common::{BinarySerializable, HasLen}; use crate::common::{BinarySerializable, HasLen};
use crate::directory::{FileSlice, OwnedBytes}; use crate::directory::{FileSlice, OwnedBytes};
use crate::schema::Document; use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage; use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId; use crate::DocId;
use std::cell::RefCell; use lru::LruCache;
use std::io; use std::io;
use std::mem::size_of; use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
const LRU_CACHE_CAPACITY: usize = 100;
type Block = Arc<Vec<u8>>;
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
/// Reads document off tantivy's [`Store`](./index.html) /// Reads document off tantivy's [`Store`](./index.html)
#[derive(Clone)]
pub struct StoreReader { pub struct StoreReader {
data: FileSlice, data: FileSlice,
offset_index_file: OwnedBytes, cache: BlockCache,
current_block_offset: RefCell<usize>, cache_hits: Arc<AtomicUsize>,
current_block: RefCell<Vec<u8>>, cache_misses: Arc<AtomicUsize>,
max_doc: DocId, skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
} }
impl StoreReader { impl StoreReader {
/// Opens a store reader /// Opens a store reader
// TODO rename open
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> { pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
let (data_file, offset_index_file, max_doc) = split_file(store_file)?; let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader { Ok(StoreReader {
data: data_file, data: data_file,
offset_index_file: offset_index_file.read_bytes()?, cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
current_block_offset: RefCell::new(usize::max_value()), cache_hits: Default::default(),
current_block: RefCell::new(Vec::new()), cache_misses: Default::default(),
max_doc, skip_index: Arc::new(skip_index),
space_usage,
}) })
} }
pub(crate) fn block_index(&self) -> SkipList<'_, u64> { pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
SkipList::from(self.offset_index_file.as_slice()) self.skip_index.checkpoints()
} }
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.block_index() self.skip_index.seek(doc_id)
.seek(u64::from(doc_id) + 1)
.map(|(doc, offset)| (doc as DocId, offset))
.unwrap_or((0u32, 0u64))
} }
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> { pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
self.data.read_bytes() self.data.read_bytes()
} }
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> { fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4); self.data
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?; .slice(
block_body.slice_to(block_len as usize).read_bytes() checkpoint.start_offset as usize,
checkpoint.end_offset as usize,
)
.read_bytes()
} }
fn read_block(&self, block_offset: usize) -> io::Result<()> { fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
if block_offset != *self.current_block_offset.borrow() { if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
let mut current_block_mut = self.current_block.borrow_mut(); self.cache_hits.fetch_add(1, Ordering::SeqCst);
current_block_mut.clear(); return Ok(block.clone());
let compressed_block = self.compressed_block(block_offset)?;
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
*self.current_block_offset.borrow_mut() = block_offset;
} }
Ok(())
self.cache_misses.fetch_add(1, Ordering::SeqCst);
let compressed_block = self.compressed_block(checkpoint)?;
let mut decompressed_block = vec![];
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = Arc::new(decompressed_block);
self.cache
.lock()
.unwrap()
.put(checkpoint.start_offset, block.clone());
Ok(block)
} }
/// Reads a given document. /// Reads a given document.
@@ -74,14 +96,15 @@ impl StoreReader {
/// It should not be called to score documents /// It should not be called to score documents
/// for instance. /// for instance.
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> { pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
let (first_doc_id, block_offset) = self.block_offset(doc_id); let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
self.read_block(block_offset as usize)?; crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
let current_block_mut = self.current_block.borrow_mut(); })?;
let mut cursor = &current_block_mut[..]; let mut cursor = &self.read_block(&checkpoint)?[..];
for _ in first_doc_id..doc_id { for _ in checkpoint.start_doc..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..]; cursor = &cursor[doc_length..];
} }
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[..doc_length]; cursor = &cursor[..doc_length];
Ok(Document::deserialize(&mut cursor)?) Ok(Document::deserialize(&mut cursor)?)
@@ -89,21 +112,93 @@ impl StoreReader {
/// Summarize total space usage of this store reader. /// Summarize total space usage of this store reader.
pub fn space_usage(&self) -> StoreSpaceUsage { pub fn space_usage(&self) -> StoreSpaceUsage {
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len()) self.space_usage.clone()
} }
} }
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> { fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
let data_len = data.len(); let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>(); let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
let mut serialized_offset_buf = serialized_offset.as_slice(); let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf)?; let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
let offset = offset as usize; Ok(data.split(offset))
let max_doc = u32::deserialize(&mut serialized_offset_buf)?; }
Ok((
data.slice(0, offset), #[cfg(test)]
data.slice(offset, footer_offset), mod tests {
max_doc, use super::*;
)) use crate::schema::Document;
use crate::schema::Field;
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
use std::path::Path;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.text())
}
#[test]
fn test_store_lru_cache() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
assert_eq!(store.cache.lock().unwrap().len(), 0);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 1);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(499)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(18806)
);
Ok(())
}
} }

View File

@@ -1,168 +0,0 @@
#![allow(dead_code)]
mod skiplist;
mod skiplist_builder;
pub use self::skiplist::SkipList;
pub use self::skiplist_builder::SkipListBuilder;
#[cfg(test)]
mod tests {
use super::{SkipList, SkipListBuilder};
#[test]
fn test_skiplist() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), Some((2, 3)));
}
#[test]
fn test_skiplist2() {
let mut output: Vec<u8> = Vec::new();
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist3() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
assert_eq!(skip_list.next().unwrap(), (3, ()));
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist4() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(5);
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist5() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(6, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(6);
assert_eq!(skip_list.next().unwrap(), (6, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist6() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(10);
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist7() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..1000 {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.insert(1004, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (0, ()));
skip_list.seek(431);
assert_eq!(skip_list.next().unwrap(), (431, ()));
skip_list.seek(1003);
assert_eq!(skip_list.next().unwrap(), (1004, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist8() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 11);
assert_eq!(output[0], 1u8 + 128u8);
}
#[test]
fn test_skiplist9() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
for i in 0..4 * 4 * 4 {
skip_list_builder.insert(i, &i).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 774);
assert_eq!(output[0], 4u8 + 128u8);
}
#[test]
fn test_skiplist10() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..((4 * 4 * 4) - 1) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 230);
assert_eq!(output[0], 128u8 + 3u8);
}
#[test]
fn test_skiplist11() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..(4 * 4) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 65);
assert_eq!(output[0], 128u8 + 3u8);
}
}

View File

@@ -1,133 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use std::cmp::max;
use std::marker::PhantomData;
static EMPTY: [u8; 0] = [];
struct Layer<'a, T> {
data: &'a [u8],
cursor: &'a [u8],
next_id: Option<u64>,
_phantom_: PhantomData<T>,
}
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
if let Some(cur_id) = self.next_id {
let cur_val = T::deserialize(&mut self.cursor).unwrap();
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
Some((cur_id, cur_val))
} else {
None
}
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
fn from(data: &'a [u8]) -> Layer<'a, T> {
let mut cursor = data;
let next_id = VInt::deserialize_u64(&mut cursor).ok();
Layer {
data,
cursor,
next_id,
_phantom_: PhantomData,
}
}
}
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn empty() -> Layer<'a, T> {
Layer {
data: &EMPTY,
cursor: &EMPTY,
next_id: None,
_phantom_: PhantomData,
}
}
fn seek_offset(&mut self, offset: usize) {
self.cursor = &self.data[offset..];
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
}
// Returns the last element (key, val)
// such that (key < doc_id)
//
// If there is no such element anymore,
// returns None.
//
// If the element exists, it will be returned
// at the next call to `.next()`.
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut result: Option<(u64, T)> = None;
loop {
if let Some(next_id) = self.next_id {
if next_id < key {
if let Some(v) = self.next() {
result = Some(v);
continue;
}
}
}
return result;
}
}
}
pub struct SkipList<'a, T: BinarySerializable> {
data_layer: Layer<'a, T>,
skip_layers: Vec<Layer<'a, u64>>,
}
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
self.data_layer.next()
}
}
impl<'a, T: BinarySerializable> SkipList<'a, T> {
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut next_layer_skip: Option<(u64, u64)> = None;
for skip_layer in &mut self.skip_layers {
if let Some((_, offset)) = next_layer_skip {
skip_layer.seek_offset(offset as usize);
}
next_layer_skip = skip_layer.seek(key);
}
if let Some((_, offset)) = next_layer_skip {
self.data_layer.seek_offset(offset as usize);
}
self.data_layer.seek(key)
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let num_layers = offsets.len();
let layers_data: &[u8] = data;
let data_layer: Layer<'a, T> = if num_layers == 0 {
Layer::empty()
} else {
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
Layer::from(first_layer_data)
};
let skip_layers = (0..max(1, num_layers) - 1)
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
.collect();
SkipList {
skip_layers,
data_layer,
}
}
}

View File

@@ -1,98 +0,0 @@
use crate::common::{is_power_of_2, BinarySerializable, VInt};
use std::io;
use std::io::Write;
use std::marker::PhantomData;
struct LayerBuilder<T: BinarySerializable> {
period_mask: usize,
buffer: Vec<u8>,
len: usize,
_phantom_: PhantomData<T>,
}
impl<T: BinarySerializable> LayerBuilder<T> {
fn written_size(&self) -> usize {
self.buffer.len()
}
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
output.write_all(&self.buffer)?;
Ok(())
}
fn with_period(period: usize) -> LayerBuilder<T> {
assert!(is_power_of_2(period), "The period has to be a power of 2.");
LayerBuilder {
period_mask: (period - 1),
buffer: Vec::new(),
len: 0,
_phantom_: PhantomData,
}
}
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
self.len += 1;
let offset = self.written_size() as u64;
VInt(key).serialize_into_vec(&mut self.buffer);
value.serialize(&mut self.buffer)?;
let emit_skip_info = (self.period_mask & self.len) == 0;
if emit_skip_info {
Ok(Some((key, offset)))
} else {
Ok(None)
}
}
}
pub struct SkipListBuilder<T: BinarySerializable> {
period: usize,
data_layer: LayerBuilder<T>,
skip_layers: Vec<LayerBuilder<u64>>,
}
impl<T: BinarySerializable> SkipListBuilder<T> {
pub fn new(period: usize) -> SkipListBuilder<T> {
SkipListBuilder {
period,
data_layer: LayerBuilder::with_period(period),
skip_layers: Vec::new(),
}
}
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
if layer_id == self.skip_layers.len() {
let layer_builder = LayerBuilder::with_period(self.period);
self.skip_layers.push(layer_builder);
}
&mut self.skip_layers[layer_id]
}
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
for layer_id in 0.. {
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
skip_pointer = self
.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?;
} else {
break;
}
}
Ok(())
}
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
let mut size: u64 = self.data_layer.buffer.len() as u64;
let mut layer_sizes = vec![VInt(size)];
for layer in self.skip_layers.iter().rev() {
size += layer.buffer.len() as u64;
layer_sizes.push(VInt(size));
}
layer_sizes.serialize(output)?;
self.data_layer.write(output)?;
for layer in self.skip_layers.iter().rev() {
layer.write(output)?;
}
Ok(())
}
}

50
src/store/tests_store.rs Normal file
View File

@@ -0,0 +1,50 @@
use std::path::Path;
use crate::HasLen;
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
use crate::fastfield::DeleteBitSet;
use super::{StoreReader, StoreWriter};
#[test]
fn test_toto2() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
assert!(directory.validate_checksum(path)?);
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let documents = store.documents();
// for doc in documents {
// println!("{:?}", doc);
// }
let doc= store.get(15_086)?;
Ok(())
}
#[test]
fn test_toto() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
let store = StoreReader::open(store_file)?;
let doc= store.get(53)?;
println!("{:?}", doc);
// let documents = store.documents();
// let ram_directory = RAMDirectory::create();
// let path = Path::new("store");
// let store_wrt = ram_directory.open_write(path)?;
// let mut store_writer = StoreWriter::new(store_wrt);
// for doc in &documents {
// store_writer.store(doc)?;
// }
// store_writer.close()?;
// let store_data = ram_directory.open_read(path)?;
// let new_store = StoreReader::open(store_data)?;
// for doc in 0..59 {
// println!("{}", doc);
// let doc = new_store.get(doc)?;
// println!("{:?}", doc);
// }
Ok(())
}

View File

@@ -1,15 +1,16 @@
use super::compress; use super::compress;
use super::skiplist::SkipListBuilder; use super::index::SkipIndexBuilder;
use super::StoreReader; use super::StoreReader;
use crate::common::CountingWriter; use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt}; use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite; use crate::directory::TerminatingWrite;
use crate::directory::WritePtr; use crate::directory::WritePtr;
use crate::schema::Document; use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId; use crate::DocId;
use std::io::{self, Write}; use std::io::{self, Write};
const BLOCK_SIZE: usize = 16_384; const BLOCK_SIZE: usize = 30;
/// Write tantivy's [`Store`](./index.html) /// Write tantivy's [`Store`](./index.html)
/// ///
@@ -21,7 +22,8 @@ const BLOCK_SIZE: usize = 16_384;
/// ///
pub struct StoreWriter { pub struct StoreWriter {
doc: DocId, doc: DocId,
offset_index_writer: SkipListBuilder<u64>, first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
writer: CountingWriter<WritePtr>, writer: CountingWriter<WritePtr>,
intermediary_buffer: Vec<u8>, intermediary_buffer: Vec<u8>,
current_block: Vec<u8>, current_block: Vec<u8>,
@@ -35,7 +37,8 @@ impl StoreWriter {
pub fn new(writer: WritePtr) -> StoreWriter { pub fn new(writer: WritePtr) -> StoreWriter {
StoreWriter { StoreWriter {
doc: 0, doc: 0,
offset_index_writer: SkipListBuilder::new(4), first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
writer: CountingWriter::wrap(writer), writer: CountingWriter::wrap(writer),
intermediary_buffer: Vec::new(), intermediary_buffer: Vec::new(),
current_block: Vec::new(), current_block: Vec::new(),
@@ -68,11 +71,10 @@ impl StoreWriter {
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() { if !self.current_block.is_empty() {
self.write_and_compress_block()?; self.write_and_compress_block()?;
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
} }
let doc_offset = self.doc; assert_eq!(self.first_doc_in_block, self.doc);
let start_offset = self.writer.written_bytes() as u64; let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
// just bulk write all of the block of the given reader. // just bulk write all of the block of the given reader.
self.writer self.writer
@@ -80,21 +82,36 @@ impl StoreWriter {
// concatenate the index of the `store_reader`, after translating // concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset. // its start doc id and its start file offset.
for (next_doc_id, block_addr) in store_reader.block_index() { for mut checkpoint in store_reader.block_checkpoints() {
self.doc = doc_offset + next_doc_id as u32; checkpoint.start_doc += doc_shift;
self.offset_index_writer checkpoint.end_doc += doc_shift;
.insert(u64::from(self.doc), &(start_offset + block_addr))?; checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.register_checkpoint(checkpoint);
} }
Ok(()) Ok(())
} }
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> { fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear(); self.intermediary_buffer.clear();
compress(&self.current_block[..], &mut self.intermediary_buffer)?; compress(&self.current_block[..], &mut self.intermediary_buffer)?;
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?; let start_offset = self.writer.written_bytes();
self.writer.write_all(&self.intermediary_buffer)?; self.writer.write_all(&self.intermediary_buffer)?;
self.offset_index_writer let end_offset = self.writer.written_bytes();
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?; let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear(); self.current_block.clear();
Ok(()) Ok(())
} }
@@ -110,7 +127,6 @@ impl StoreWriter {
let header_offset: u64 = self.writer.written_bytes() as u64; let header_offset: u64 = self.writer.written_bytes() as u64;
self.offset_index_writer.write(&mut self.writer)?; self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?; header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.terminate() self.writer.terminate()
} }
} }

View File

@@ -0,0 +1,27 @@
/*!
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
about the term.
Internally, the term dictionary relies on the `fst` crate to store
a sorted mapping that associate each term to its rank in the lexicographical order.
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
lexicographical order matches the natural order of integers.
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
and then treated as a `u64`.
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
mod streamer;
mod term_info_store;
mod termdict;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};

View File

@@ -1,3 +1,5 @@
use std::io;
use super::TermDictionary; use super::TermDictionary;
use crate::postings::TermInfo; use crate::postings::TermInfo;
use crate::termdict::TermOrdinal; use crate::termdict::TermOrdinal;
@@ -59,14 +61,14 @@ where
/// Creates the stream corresponding to the range /// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`. /// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> TermStreamer<'a, A> { pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
TermStreamer { Ok(TermStreamer {
fst_map: self.fst_map, fst_map: self.fst_map,
stream: self.stream_builder.into_stream(), stream: self.stream_builder.into_stream(),
term_ord: 0u64, term_ord: 0u64,
current_key: Vec::with_capacity(100), current_key: Vec::with_capacity(100),
current_value: TermInfo::default(), current_value: TermInfo::default(),
} })
} }
} }

View File

@@ -55,22 +55,32 @@ impl TermInfoBlockMeta {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
} }
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
// is encoded without bitpacking.
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo { fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
assert!(inner_offset < BLOCK_LEN - 1);
let num_bits = self.num_bits() as usize; let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32; let posting_start_addr = num_bits * inner_offset;
cursor += self.doc_freq_nbits as usize; // the stop offset is the start offset of the next term info.
let posting_stop_addr = posting_start_addr + num_bits;
let doc_freq_addr = posting_start_addr + self.postings_offset_nbits as usize;
let positions_idx_addr = doc_freq_addr + self.doc_freq_nbits as usize;
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits); let postings_start_offset = self.ref_term_info.postings_start_offset
cursor += self.postings_offset_nbits as usize; + extract_bits(data, posting_start_addr, self.postings_offset_nbits);
let postings_stop_offset = self.ref_term_info.postings_start_offset
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits); + extract_bits(data, posting_stop_addr, self.postings_offset_nbits);
let doc_freq = extract_bits(data, doc_freq_addr, self.doc_freq_nbits) as u32;
let positions_idx = self.ref_term_info.positions_idx
+ extract_bits(data, positions_idx_addr, self.positions_idx_nbits);
TermInfo { TermInfo {
doc_freq, doc_freq,
postings_offset: postings_offset + self.ref_term_info.postings_offset, postings_start_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx, postings_stop_offset,
positions_idx,
} }
} }
} }
@@ -152,16 +162,17 @@ fn bitpack_serialize<W: Write>(
term_info_block_meta: &TermInfoBlockMeta, term_info_block_meta: &TermInfoBlockMeta,
term_info: &TermInfo, term_info: &TermInfo,
) -> io::Result<()> { ) -> io::Result<()> {
bit_packer.write(
term_info.postings_start_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write( bit_packer.write(
u64::from(term_info.doc_freq), u64::from(term_info.doc_freq),
term_info_block_meta.doc_freq_nbits, term_info_block_meta.doc_freq_nbits,
write, write,
)?; )?;
bit_packer.write(
term_info.postings_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write( bit_packer.write(
term_info.positions_idx, term_info.positions_idx,
term_info_block_meta.positions_idx_nbits, term_info_block_meta.positions_idx_nbits,
@@ -181,23 +192,27 @@ impl TermInfoStoreWriter {
} }
fn flush_block(&mut self) -> io::Result<()> { fn flush_block(&mut self) -> io::Result<()> {
if self.term_infos.is_empty() {
return Ok(());
}
let mut bit_packer = BitPacker::new(); let mut bit_packer = BitPacker::new();
let ref_term_info = self.term_infos[0].clone(); let ref_term_info = self.term_infos[0].clone();
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
last_term_info
} else {
return Ok(());
};
let postings_stop_offset =
last_term_info.postings_stop_offset - ref_term_info.postings_start_offset;
for term_info in &mut self.term_infos[1..] { for term_info in &mut self.term_infos[1..] {
term_info.postings_offset -= ref_term_info.postings_offset; term_info.postings_start_offset -= ref_term_info.postings_start_offset;
term_info.positions_idx -= ref_term_info.positions_idx; term_info.positions_idx -= ref_term_info.positions_idx;
} }
let mut max_doc_freq: u32 = 0u32; let mut max_doc_freq: u32 = 0u32;
let mut max_postings_offset: u64 = 0u64; let max_postings_offset: u64 = postings_stop_offset;
let mut max_positions_idx: u64 = 0u64; let max_positions_idx: u64 = last_term_info.positions_idx;
for term_info in &self.term_infos[1..] { for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq); max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
} }
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq)); let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
@@ -222,6 +237,12 @@ impl TermInfoStoreWriter {
)?; )?;
} }
bit_packer.write(
postings_stop_offset,
term_info_block_meta.postings_offset_nbits,
&mut self.buffer_term_infos,
)?;
// Block need end up at the end of a byte. // Block need end up at the end of a byte.
bit_packer.flush(&mut self.buffer_term_infos)?; bit_packer.flush(&mut self.buffer_term_infos)?;
self.term_infos.clear(); self.term_infos.clear();
@@ -230,6 +251,7 @@ impl TermInfoStoreWriter {
} }
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> { pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
assert!(term_info.postings_stop_offset >= term_info.postings_start_offset);
self.num_terms += 1u64; self.num_terms += 1u64;
self.term_infos.push(term_info.clone()); self.term_infos.push(term_info.clone());
if self.term_infos.len() >= BLOCK_LEN { if self.term_infos.len() >= BLOCK_LEN {
@@ -289,10 +311,11 @@ mod tests {
#[test] #[test]
fn test_term_info_block_meta_serialization() { fn test_term_info_block_meta_serialization() {
let term_info_block_meta = TermInfoBlockMeta { let term_info_block_meta = TermInfoBlockMeta {
offset: 2009, offset: 2009u64,
ref_term_info: TermInfo { ref_term_info: TermInfo {
doc_freq: 512, doc_freq: 512,
postings_offset: 51, postings_start_offset: 51,
postings_stop_offset: 57u64,
positions_idx: 3584, positions_idx: 3584,
}, },
doc_freq_nbits: 10, doc_freq_nbits: 10,
@@ -310,10 +333,12 @@ mod tests {
fn test_pack() -> crate::Result<()> { fn test_pack() -> crate::Result<()> {
let mut store_writer = TermInfoStoreWriter::new(); let mut store_writer = TermInfoStoreWriter::new();
let mut term_infos = vec![]; let mut term_infos = vec![];
let offset = |i| (i * 13 + i * i) as u64;
for i in 0..1000 { for i in 0..1000 {
let term_info = TermInfo { let term_info = TermInfo {
doc_freq: i as u32, doc_freq: i as u32,
postings_offset: (i / 10) as u64, postings_start_offset: offset(i),
postings_stop_offset: offset(i + 1),
positions_idx: (i * 7) as u64, positions_idx: (i * 7) as u64,
}; };
store_writer.write_term_info(&term_info)?; store_writer.write_term_info(&term_info)?;
@@ -323,7 +348,12 @@ mod tests {
store_writer.serialize(&mut buffer)?; store_writer.serialize(&mut buffer)?;
let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?; let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?;
for i in 0..1000 { for i in 0..1000 {
assert_eq!(term_info_store.get(i as u64), term_infos[i]); assert_eq!(
term_info_store.get(i as u64),
term_infos[i],
"term info {}",
i
);
} }
Ok(()) Ok(())
} }

View File

@@ -80,7 +80,6 @@ where
.serialize(&mut counting_writer)?; .serialize(&mut counting_writer)?;
let footer_size = counting_writer.written_bytes(); let footer_size = counting_writer.written_bytes();
(footer_size as u64).serialize(&mut counting_writer)?; (footer_size as u64).serialize(&mut counting_writer)?;
counting_writer.flush()?;
} }
Ok(file) Ok(file)
} }
@@ -139,8 +138,8 @@ impl TermDictionary {
} }
/// Returns the ordinal associated to a given term. /// Returns the ordinal associated to a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> { pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
self.fst_index.get(key) Ok(self.fst_index.get(key))
} }
/// Returns the term associated to a given term ordinal. /// Returns the term associated to a given term ordinal.
@@ -152,7 +151,7 @@ impl TermDictionary {
/// ///
/// Regardless of whether the term is found or not, /// Regardless of whether the term is found or not,
/// the buffer may be modified. /// the buffer may be modified.
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool { pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
bytes.clear(); bytes.clear();
let fst = self.fst_index.as_fst(); let fst = self.fst_index.as_fst();
let mut node = fst.root(); let mut node = fst.root();
@@ -167,10 +166,10 @@ impl TermDictionary {
let new_node_addr = transition.addr; let new_node_addr = transition.addr;
node = fst.node(new_node_addr); node = fst.node(new_node_addr);
} else { } else {
return false; return Ok(false);
} }
} }
true Ok(true)
} }
/// Returns the number of terms in the dictionary. /// Returns the number of terms in the dictionary.
@@ -179,9 +178,10 @@ impl TermDictionary {
} }
/// Lookups the value corresponding to the key. /// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> { pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
self.term_ord(key) Ok(self
.map(|term_ord| self.term_info_from_ord(term_ord)) .term_ord(key)?
.map(|term_ord| self.term_info_from_ord(term_ord)))
} }
/// Returns a range builder, to stream all of the terms /// Returns a range builder, to stream all of the terms
@@ -191,7 +191,7 @@ impl TermDictionary {
} }
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field) /// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream(&self) -> TermStreamer<'_> { pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
self.range().into_stream() self.range().into_stream()
} }

View File

@@ -60,12 +60,10 @@ impl<'a> TermMerger<'a> {
pub(crate) fn matching_segments<'b: 'a>( pub(crate) fn matching_segments<'b: 'a>(
&'b self, &'b self,
) -> Box<dyn 'b + Iterator<Item = (usize, TermOrdinal)>> { ) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
Box::new(
self.current_streamers self.current_streamers
.iter() .iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())), .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
)
} }
fn advance_segments(&mut self) { fn advance_segments(&mut self) {

View File

@@ -20,435 +20,37 @@ as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html). A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/ */
use tantivy_fst::automaton::AlwaysMatch;
mod fst_termdict;
use fst_termdict as termdict;
mod merger;
#[cfg(test)]
mod tests;
/// Position of the term in the sorted list of terms. /// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64; pub type TermOrdinal = u64;
mod merger; /// The term dictionary contains all of the terms in
mod streamer; /// `tantivy index` in a sorted manner.
mod term_info_store; pub type TermDictionary = self::termdict::TermDictionary;
mod termdict;
pub use self::merger::TermMerger; /// Builder for the new term dictionary.
pub use self::streamer::{TermStreamer, TermStreamerBuilder}; ///
pub use self::termdict::{TermDictionary, TermDictionaryBuilder}; /// Inserting must be done in the order of the `keys`.
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
#[cfg(test)] /// Given a list of sorted term streams,
mod tests { /// returns an iterator over sorted unique terms.
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; ///
use crate::core::Index; /// The item yield is actually a pair with
use crate::directory::{Directory, FileSlice, RAMDirectory}; /// - the term
use crate::postings::TermInfo; /// - a slice with the ordinal of the segments containing
use crate::schema::{Schema, TEXT}; /// the terms.
use std::path::PathBuf; pub type TermMerger<'a> = self::merger::TermMerger<'a>;
use std::str;
const BLOCK_SIZE: usize = 1_500; /// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
fn make_term_info(val: u64) -> TermInfo { pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
TermInfo {
doc_freq: val as u32,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream();
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_iterator() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a b d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a b c d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field => "e f"));
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let field_searcher = searcher.field(text_field)?;
let mut term_it = field_searcher.terms();
let mut term_string = String::new();
while term_it.advance() {
//let term = Term::from_bytes(term_it.key());
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
}
assert_eq!(&*term_string, "abcdef");
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream();
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes()),
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(2))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream();
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream();
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream();
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_stream_range_boundaries() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
if backwards {
res.reverse();
}
res
};
{
let range = term_dictionary.range().backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream();
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream();
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}
}

431
src/termdict/tests.rs Normal file
View File

@@ -0,0 +1,431 @@
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
use crate::postings::TermInfo;
use std::path::PathBuf;
use std::str;
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().unwrap().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream()?;
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream()?;
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes())?,
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream()?;
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream()?;
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream()?;
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
TermDictionary::open(file)
}
#[test]
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
res
};
{
let range = term_dictionary.range().ge([2u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream()?;
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
}
{
let range = term_dictionary.range().le([6u8]).into_stream()?;
assert_eq!(
value_list(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list_backward = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
res.reverse();
res
};
{
let range = term_dictionary.range().backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32]
);
}
Ok(())
}
#[test]
fn test_ord_to_term() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut bytes = vec![];
for b in 0u8..10u8 {
termdict.ord_to_term(b as u64, &mut bytes)?;
assert_eq!(&bytes, &[b]);
}
Ok(())
}
#[test]
fn test_stream_term_ord() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut stream = termdict.stream()?;
for b in 0u8..10u8 {
assert!(stream.advance(), true);
assert_eq!(stream.term_ord(), b as u64);
assert_eq!(stream.key(), &[b]);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream()?;
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}

View File

@@ -18,7 +18,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
.unwrap() .unwrap()
.terminate() .terminate()
.unwrap(); .unwrap();
assert!(managed_directory.exists(test_path)); assert!(managed_directory.exists(test_path).unwrap());
// triggering gc and setting the delete operation to fail. // triggering gc and setting the delete operation to fail.
// //
// We are checking that the gc operation is not removing the // We are checking that the gc operation is not removing the
@@ -29,12 +29,12 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
// lock file. // lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap(); fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
assert!(managed_directory.garbage_collect(Default::default).is_ok()); assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(managed_directory.exists(test_path)); assert!(managed_directory.exists(test_path).unwrap());
// running the gc a second time should remove the file. // running the gc a second time should remove the file.
assert!(managed_directory.garbage_collect(Default::default).is_ok()); assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!( assert!(
!managed_directory.exists(test_path), !managed_directory.exists(test_path).unwrap(),
"The file should have been deleted" "The file should have been deleted"
); );
} }