mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
90 Commits
issue/922
...
debugging-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08f7706973 | ||
|
|
bf6e6e8a7c | ||
|
|
203b0256a3 | ||
|
|
caf2a38b7e | ||
|
|
96f24b078e | ||
|
|
332b50a4eb | ||
|
|
8ca0954b3b | ||
|
|
36343e2de8 | ||
|
|
2f14a892ca | ||
|
|
9c3cabce40 | ||
|
|
f8d71c2b10 | ||
|
|
394dfb24f1 | ||
|
|
b0549a229d | ||
|
|
670b6eaff6 | ||
|
|
a4f33d3823 | ||
|
|
c7841e3da5 | ||
|
|
e7b4a12bba | ||
|
|
0aaa929d6e | ||
|
|
1112797c18 | ||
|
|
920481e1c1 | ||
|
|
55f7b84966 | ||
|
|
09ab4df1fe | ||
|
|
0c2cf81b37 | ||
|
|
d864430bda | ||
|
|
de60540e06 | ||
|
|
c3e311e6b8 | ||
|
|
ac704f2f22 | ||
|
|
be626083a0 | ||
|
|
b68fcca1e0 | ||
|
|
af6dfa1856 | ||
|
|
654c400a0b | ||
|
|
80a99539ce | ||
|
|
4b1c770e5e | ||
|
|
3491645e69 | ||
|
|
e72c8287f8 | ||
|
|
b4b3bc7acd | ||
|
|
521c7b271b | ||
|
|
acd888c999 | ||
|
|
3ab1ba0b2f | ||
|
|
b344c0ac05 | ||
|
|
1741619c7f | ||
|
|
067ba3dff0 | ||
|
|
f79250f665 | ||
|
|
5a33b8d533 | ||
|
|
d165655fb1 | ||
|
|
c805871b92 | ||
|
|
f288e32634 | ||
|
|
bc44543d8f | ||
|
|
db514208a7 | ||
|
|
b6ff29e020 | ||
|
|
7c94dfdc15 | ||
|
|
8782c0eada | ||
|
|
fea0ba1042 | ||
|
|
027555c75f | ||
|
|
b478ed747a | ||
|
|
e9aa27dace | ||
|
|
c079133f3a | ||
|
|
30c5f7c5f0 | ||
|
|
6f26871c0f | ||
|
|
f93cc5b5e3 | ||
|
|
5a25c8dfd3 | ||
|
|
f5c079159d | ||
|
|
1cfdce3437 | ||
|
|
e9e6d141e9 | ||
|
|
8d0e049261 | ||
|
|
0335c7353d | ||
|
|
267e920a80 | ||
|
|
d8a3a47e3e | ||
|
|
7f0e61b173 | ||
|
|
ce4c50446b | ||
|
|
9ab25d2575 | ||
|
|
6d4b982417 | ||
|
|
650eca271f | ||
|
|
8ee55aef6d | ||
|
|
40d41c7dcb | ||
|
|
c780a889a7 | ||
|
|
eef348004e | ||
|
|
e784bbc40f | ||
|
|
b8118d439f | ||
|
|
a49e59053c | ||
|
|
41bb2bd58b | ||
|
|
7fd6054145 | ||
|
|
6abf4e97b5 | ||
|
|
d23aee76c9 | ||
|
|
58a1595792 | ||
|
|
726d32eac5 | ||
|
|
b5f3dcdc8b | ||
|
|
2875deb4b1 | ||
|
|
b2dfacdc70 | ||
|
|
36a0520a48 |
@@ -7,6 +7,12 @@ Tantivy 0.14.0
|
||||
- Added support for Brotli compression in the DocStore. (@ppodolsky)
|
||||
- Added helper for building intersections and unions in BooleanQuery (@guilload)
|
||||
- Bugfix in `Query::explain`
|
||||
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
|
||||
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
|
||||
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
|
||||
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
|
||||
|
||||
This version breaks compatibility and requires users to reindex everything.
|
||||
|
||||
Tantivy 0.13.2
|
||||
===================
|
||||
|
||||
12
Cargo.toml
12
Cargo.toml
@@ -30,7 +30,6 @@ serde_json = "1"
|
||||
num_cpus = "1"
|
||||
fs2={version="0.4", optional=true}
|
||||
levenshtein_automata = "0.2"
|
||||
notify = {version="4", optional=true}
|
||||
uuid = { version = "0.8", features = ["v4", "serde"] }
|
||||
crossbeam = "0.8"
|
||||
futures = {version = "0.3", features=["thread-pool"] }
|
||||
@@ -48,15 +47,18 @@ murmurhash32 = "0.2"
|
||||
chrono = "0.4"
|
||||
smallvec = "1"
|
||||
rayon = "1"
|
||||
env_logger = "0.8"
|
||||
lru = "0.6"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.7"
|
||||
rand = "0.8"
|
||||
maplit = "1"
|
||||
matches = "0.1.8"
|
||||
proptest = "0.10"
|
||||
criterion = "0.3"
|
||||
|
||||
[dev-dependencies.fail]
|
||||
version = "0.4"
|
||||
@@ -73,7 +75,7 @@ overflow-checks = true
|
||||
|
||||
[features]
|
||||
default = ["mmap"]
|
||||
mmap = ["fs2", "tempfile", "memmap", "notify"]
|
||||
mmap = ["fs2", "tempfile", "memmap"]
|
||||
brotli-compression = ["brotli"]
|
||||
lz4-compression = ["lz4"]
|
||||
failpoints = ["fail/failpoints"]
|
||||
@@ -97,3 +99,7 @@ travis-ci = { repository = "tantivy-search/tantivy" }
|
||||
name = "failpoints"
|
||||
path = "tests/failpoints/mod.rs"
|
||||
required-features = ["fail/failpoints"]
|
||||
|
||||
[[bench]]
|
||||
name = "analyzer"
|
||||
harness = false
|
||||
|
||||
3774
benches/alice.txt
Normal file
3774
benches/alice.txt
Normal file
File diff suppressed because it is too large
Load Diff
22
benches/analyzer.rs
Normal file
22
benches/analyzer.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use tantivy::tokenizer::TokenizerManager;
|
||||
|
||||
const ALICE_TXT: &'static str = include_str!("alice.txt");
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
let tokenizer = tokenizer_manager.get("default").unwrap();
|
||||
c.bench_function("default-tokenize-alice", |b| {
|
||||
b.iter(|| {
|
||||
let mut word_count = 0;
|
||||
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
|
||||
while token_stream.advance() {
|
||||
word_count += 1;
|
||||
}
|
||||
assert_eq!(word_count, 30_731);
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
|
||||
|
||||
let query_ords: HashSet<u64> = facets
|
||||
.iter()
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()))
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
|
||||
.collect();
|
||||
|
||||
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);
|
||||
|
||||
@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
|
||||
let mut collapse_facet_it = self.facets.iter().peekable();
|
||||
collapse_facet_ords.push(0);
|
||||
{
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
|
||||
if facet_streamer.advance() {
|
||||
'outer: loop {
|
||||
// at the begining of this loop, facet_streamer
|
||||
@@ -368,9 +368,12 @@ impl SegmentCollector for FacetSegmentCollector {
|
||||
}
|
||||
let mut facet = vec![];
|
||||
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
|
||||
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
|
||||
// TODO
|
||||
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
|
||||
// TODO handle errors.
|
||||
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
|
||||
if let Ok(facet) = Facet::from_encoded(facet) {
|
||||
facet_counts.insert(facet, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
FacetCounts { facet_counts }
|
||||
}
|
||||
|
||||
189
src/collector/filter_collector_wrapper.rs
Normal file
189
src/collector/filter_collector_wrapper.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
// # Custom collector example
|
||||
//
|
||||
// This example shows how you can implement your own
|
||||
// collector. As an example, we will compute a collector
|
||||
// that computes the standard deviation of a given fast field.
|
||||
//
|
||||
// Of course, you can have a look at the tantivy's built-in collectors
|
||||
// such as the `CountCollector` for more examples.
|
||||
|
||||
// ---
|
||||
// Importing tantivy...
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::fastfield::{FastFieldReader, FastValue};
|
||||
use crate::schema::Field;
|
||||
use crate::{Score, SegmentReader, TantivyError};
|
||||
|
||||
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
|
||||
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
|
||||
///
|
||||
/// ```rust
|
||||
/// use tantivy::collector::{TopDocs, FilterCollector};
|
||||
/// use tantivy::query::QueryParser;
|
||||
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
|
||||
/// use tantivy::{doc, DocAddress, Index};
|
||||
///
|
||||
/// let mut schema_builder = Schema::builder();
|
||||
/// let title = schema_builder.add_text_field("title", TEXT);
|
||||
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
|
||||
/// let schema = schema_builder.build();
|
||||
/// let index = Index::create_in_ram(schema);
|
||||
///
|
||||
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
|
||||
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
|
||||
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
|
||||
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
|
||||
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
|
||||
/// assert!(index_writer.commit().is_ok());
|
||||
///
|
||||
/// let reader = index.reader().unwrap();
|
||||
/// let searcher = reader.searcher();
|
||||
///
|
||||
/// let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||
/// let query = query_parser.parse_query("diary").unwrap();
|
||||
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
|
||||
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
|
||||
///
|
||||
/// assert_eq!(top_docs.len(), 1);
|
||||
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
||||
///
|
||||
/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
||||
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
||||
///
|
||||
/// assert_eq!(filtered_top_docs.len(), 0);
|
||||
/// ```
|
||||
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
|
||||
where
|
||||
TPredicate: 'static,
|
||||
{
|
||||
field: Field,
|
||||
collector: TCollector,
|
||||
predicate: &'static TPredicate,
|
||||
t_predicate_value: PhantomData<TPredicateValue>,
|
||||
}
|
||||
|
||||
impl<TCollector, TPredicate, TPredicateValue: FastValue>
|
||||
FilterCollector<TCollector, TPredicate, TPredicateValue>
|
||||
where
|
||||
TCollector: Collector + Send + Sync,
|
||||
TPredicate: Fn(TPredicateValue) -> bool + Send + Sync,
|
||||
{
|
||||
/// Create a new FilterCollector.
|
||||
pub fn new(
|
||||
field: Field,
|
||||
predicate: &'static TPredicate,
|
||||
collector: TCollector,
|
||||
) -> FilterCollector<TCollector, TPredicate, TPredicateValue> {
|
||||
FilterCollector {
|
||||
field,
|
||||
predicate,
|
||||
collector,
|
||||
t_predicate_value: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TCollector, TPredicate, TPredicateValue: FastValue> Collector
|
||||
for FilterCollector<TCollector, TPredicate, TPredicateValue>
|
||||
where
|
||||
TCollector: Collector + Send + Sync,
|
||||
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
|
||||
TPredicateValue: 'static + FastValue,
|
||||
{
|
||||
// That's the type of our result.
|
||||
// Our standard deviation will be a float.
|
||||
type Fruit = TCollector::Fruit;
|
||||
|
||||
type Child = FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
segment_local_id: u32,
|
||||
segment_reader: &SegmentReader,
|
||||
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>> {
|
||||
let schema = segment_reader.schema();
|
||||
let field_entry = schema.get_field_entry(self.field);
|
||||
if !field_entry.is_fast() {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is not a fast field.",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let requested_type = TPredicateValue::to_type();
|
||||
let field_schema_type = field_entry.field_type().value_type();
|
||||
if requested_type != field_schema_type {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is of type {:?}!={:?}",
|
||||
field_entry.name(),
|
||||
requested_type,
|
||||
field_schema_type
|
||||
)));
|
||||
}
|
||||
|
||||
let fast_field_reader = segment_reader
|
||||
.fast_fields()
|
||||
.typed_fast_field_reader(self.field)
|
||||
.ok_or_else(|| {
|
||||
TantivyError::SchemaError(format!(
|
||||
"{:?} is not declared as a fast field in the schema.",
|
||||
self.field
|
||||
))
|
||||
})?;
|
||||
|
||||
let segment_collector = self
|
||||
.collector
|
||||
.for_segment(segment_local_id, segment_reader)?;
|
||||
|
||||
Ok(FilterSegmentCollector {
|
||||
fast_field_reader,
|
||||
segment_collector,
|
||||
predicate: self.predicate,
|
||||
t_predicate_value: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
self.collector.requires_scoring()
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
&self,
|
||||
segment_fruits: Vec<<TCollector::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<TCollector::Fruit> {
|
||||
self.collector.merge_fruits(segment_fruits)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
|
||||
where
|
||||
TPredicate: 'static,
|
||||
TPredicateValue: 'static + FastValue,
|
||||
{
|
||||
fast_field_reader: FastFieldReader<TPredicateValue>,
|
||||
segment_collector: TSegmentCollector,
|
||||
predicate: &'static TPredicate,
|
||||
t_predicate_value: PhantomData<TPredicateValue>,
|
||||
}
|
||||
|
||||
impl<TSegmentCollector, TPredicate, TPredicateValue> SegmentCollector
|
||||
for FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
|
||||
where
|
||||
TSegmentCollector: SegmentCollector,
|
||||
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
|
||||
TPredicateValue: 'static + FastValue,
|
||||
{
|
||||
type Fruit = TSegmentCollector::Fruit;
|
||||
|
||||
fn collect(&mut self, doc: u32, score: Score) {
|
||||
let value = self.fast_field_reader.get(doc);
|
||||
if (self.predicate)(value) {
|
||||
self.segment_collector.collect(doc, score)
|
||||
}
|
||||
}
|
||||
|
||||
fn harvest(self) -> <TSegmentCollector as SegmentCollector>::Fruit {
|
||||
self.segment_collector.harvest()
|
||||
}
|
||||
}
|
||||
@@ -114,6 +114,9 @@ use crate::query::Weight;
|
||||
mod docset_collector;
|
||||
pub use self::docset_collector::DocSetCollector;
|
||||
|
||||
mod filter_collector_wrapper;
|
||||
pub use self::filter_collector_wrapper::FilterCollector;
|
||||
|
||||
/// `Fruit` is the type for the result of our collection.
|
||||
/// e.g. `usize` for the `Count` collector.
|
||||
pub trait Fruit: Send + downcast_rs::Downcast {}
|
||||
|
||||
@@ -8,6 +8,13 @@ use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentLocalId;
|
||||
|
||||
use crate::collector::{FilterCollector, TopDocs};
|
||||
use crate::query::QueryParser;
|
||||
use crate::schema::{Schema, FAST, TEXT};
|
||||
use crate::DateTime;
|
||||
use crate::{doc, Index};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
|
||||
compute_score: true,
|
||||
};
|
||||
@@ -16,6 +23,54 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
|
||||
compute_score: true,
|
||||
};
|
||||
|
||||
#[test]
|
||||
pub fn test_filter_collector() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let title = schema_builder.add_text_field("title", TEXT);
|
||||
let price = schema_builder.add_u64_field("price", FAST);
|
||||
let date = schema_builder.add_date_field("date", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
|
||||
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
|
||||
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
|
||||
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
|
||||
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
|
||||
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
|
||||
assert!(index_writer.commit().is_ok());
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||
let query = query_parser.parse_query("diary").unwrap();
|
||||
let filter_some_collector = FilterCollector::new(
|
||||
price,
|
||||
&|value: u64| value > 20_120u64,
|
||||
TopDocs::with_limit(2),
|
||||
);
|
||||
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
|
||||
|
||||
assert_eq!(top_docs.len(), 1);
|
||||
assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
||||
|
||||
let filter_all_collector: FilterCollector<_, _, u64> =
|
||||
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
||||
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
||||
|
||||
assert_eq!(filtered_top_docs.len(), 0);
|
||||
|
||||
fn date_filter(value: DateTime) -> bool {
|
||||
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
|
||||
}
|
||||
|
||||
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
|
||||
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
|
||||
|
||||
assert_eq!(filtered_date_docs.len(), 2);
|
||||
}
|
||||
|
||||
/// Stores all of the doc ids.
|
||||
/// This collector is only used for tests.
|
||||
/// It is unusable in pr
|
||||
|
||||
@@ -728,7 +728,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_collector_not_at_capacity() {
|
||||
fn test_top_collector_not_at_capacity_without_offset() {
|
||||
let index = make_index();
|
||||
let field = index.schema().get_field("text").unwrap();
|
||||
let query_parser = QueryParser::for_index(&index, vec![field]);
|
||||
|
||||
@@ -20,9 +20,10 @@ impl<W: Write> CountingWriter<W> {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> io::Result<(W, u64)> {
|
||||
self.flush()?;
|
||||
Ok((self.underlying, self.written_bytes))
|
||||
/// Returns the underlying write object.
|
||||
/// Note that this method does not trigger any flushing.
|
||||
pub fn finish(self) -> W {
|
||||
self.underlying
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +47,6 @@ impl<W: Write> Write for CountingWriter<W> {
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.underlying.terminate_ref(token)
|
||||
}
|
||||
}
|
||||
@@ -63,8 +63,9 @@ mod test {
|
||||
let mut counting_writer = CountingWriter::wrap(buffer);
|
||||
let bytes = (0u8..10u8).collect::<Vec<u8>>();
|
||||
counting_writer.write_all(&bytes).unwrap();
|
||||
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap();
|
||||
let len = counting_writer.written_bytes();
|
||||
let buffer_restituted: Vec<u8> = counting_writer.finish();
|
||||
assert_eq!(len, 10u64);
|
||||
assert_eq!(w.len(), 10);
|
||||
assert_eq!(buffer_restituted.len(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,10 +66,6 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_power_of_2(n: usize) -> bool {
|
||||
(n > 0) && (n & (n - 1) == 0)
|
||||
}
|
||||
|
||||
/// Has length trait
|
||||
pub trait HasLen {
|
||||
/// Return length
|
||||
@@ -119,11 +115,16 @@ pub fn u64_to_i64(val: u64) -> i64 {
|
||||
/// For simplicity, tantivy internally handles `f64` as `u64`.
|
||||
/// The mapping is defined by this function.
|
||||
///
|
||||
/// Maps `f64` to `u64` so that lexical order is preserved.
|
||||
/// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
|
||||
///
|
||||
/// This is more suited than simply casting (`val as u64`)
|
||||
/// which would truncate the result
|
||||
///
|
||||
/// # Reference
|
||||
///
|
||||
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
|
||||
/// explains the mapping in a clear manner.
|
||||
///
|
||||
/// # See also
|
||||
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
|
||||
#[inline(always)]
|
||||
@@ -152,6 +153,7 @@ pub(crate) mod test {
|
||||
pub use super::minmax;
|
||||
pub use super::serialize::test::fixed_size_test;
|
||||
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||
use proptest::prelude::*;
|
||||
use std::f64;
|
||||
|
||||
fn test_i64_converter_helper(val: i64) {
|
||||
@@ -162,6 +164,15 @@ pub(crate) mod test {
|
||||
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
|
||||
let left_u64 = f64_to_u64(left);
|
||||
let right_u64 = f64_to_u64(right);
|
||||
assert_eq!(left_u64 < right_u64, left < right);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_i64_converter() {
|
||||
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SegmentMetaInventory;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::OpenReadError;
|
||||
use crate::directory::ManagedDirectory;
|
||||
#[cfg(feature = "mmap")]
|
||||
use crate::directory::MmapDirectory;
|
||||
@@ -34,12 +35,18 @@ fn load_metas(
|
||||
inventory: &SegmentMetaInventory,
|
||||
) -> crate::Result<IndexMeta> {
|
||||
let meta_data = directory.atomic_read(&META_FILEPATH)?;
|
||||
let meta_string = String::from_utf8_lossy(&meta_data);
|
||||
let meta_string = String::from_utf8(meta_data)
|
||||
.map_err(|utf8_err| {
|
||||
DataCorruption::new(
|
||||
META_FILEPATH.to_path_buf(),
|
||||
format!("Meta file is not valid utf-8. {:?}", utf8_err)
|
||||
)
|
||||
})?;
|
||||
IndexMeta::deserialize(&meta_string, &inventory)
|
||||
.map_err(|e| {
|
||||
DataCorruption::new(
|
||||
META_FILEPATH.to_path_buf(),
|
||||
format!("Meta file cannot be deserialized. {:?}.", e),
|
||||
format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string),
|
||||
)
|
||||
})
|
||||
.map_err(From::from)
|
||||
@@ -59,7 +66,7 @@ impl Index {
|
||||
/// Examines the directory to see if it contains an index.
|
||||
///
|
||||
/// Effectively, it only checks for the presence of the `meta.json` file.
|
||||
pub fn exists<Dir: Directory>(dir: &Dir) -> bool {
|
||||
pub fn exists<Dir: Directory>(dir: &Dir) -> Result<bool, OpenReadError> {
|
||||
dir.exists(&META_FILEPATH)
|
||||
}
|
||||
|
||||
@@ -106,7 +113,7 @@ impl Index {
|
||||
schema: Schema,
|
||||
) -> crate::Result<Index> {
|
||||
let mmap_directory = MmapDirectory::open(directory_path)?;
|
||||
if Index::exists(&mmap_directory) {
|
||||
if Index::exists(&mmap_directory)? {
|
||||
return Err(TantivyError::IndexAlreadyExists);
|
||||
}
|
||||
Index::create(mmap_directory, schema)
|
||||
@@ -114,7 +121,7 @@ impl Index {
|
||||
|
||||
/// Opens or creates a new index in the provided directory
|
||||
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
|
||||
if !Index::exists(&dir) {
|
||||
if !Index::exists(&dir)? {
|
||||
return Index::create(dir, schema);
|
||||
}
|
||||
let index = Index::open(dir)?;
|
||||
@@ -399,7 +406,7 @@ impl fmt::Debug for Index {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::directory::RAMDirectory;
|
||||
use crate::directory::{RAMDirectory, WatchCallback};
|
||||
use crate::schema::Field;
|
||||
use crate::schema::{Schema, INDEXED, TEXT};
|
||||
use crate::IndexReader;
|
||||
@@ -423,24 +430,24 @@ mod tests {
|
||||
#[test]
|
||||
fn test_index_exists() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(!Index::exists(&directory));
|
||||
assert!(!Index::exists(&directory).unwrap());
|
||||
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
assert!(Index::exists(&directory).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_or_create_should_create() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(!Index::exists(&directory));
|
||||
assert!(!Index::exists(&directory).unwrap());
|
||||
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
assert!(Index::exists(&directory).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_or_create_should_open() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
assert!(Index::exists(&directory).unwrap());
|
||||
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
|
||||
}
|
||||
|
||||
@@ -448,7 +455,7 @@ mod tests {
|
||||
fn create_should_wipeoff_existing() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
assert!(Index::exists(&directory).unwrap());
|
||||
assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok());
|
||||
}
|
||||
|
||||
@@ -456,7 +463,7 @@ mod tests {
|
||||
fn open_or_create_exists_but_schema_does_not_match() {
|
||||
let directory = RAMDirectory::create();
|
||||
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
|
||||
assert!(Index::exists(&directory));
|
||||
assert!(Index::exists(&directory).unwrap());
|
||||
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
|
||||
let err = Index::open_or_create(directory, Schema::builder().build());
|
||||
assert_eq!(
|
||||
@@ -510,28 +517,28 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_manual_policy_mmap() {
|
||||
fn test_index_manual_policy_mmap() -> crate::Result<()> {
|
||||
let schema = throw_away_schema();
|
||||
let field = schema.get_field("num_likes").unwrap();
|
||||
let mut index = Index::create_from_tempdir(schema).unwrap();
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.commit().unwrap();
|
||||
let mut index = Index::create_from_tempdir(schema)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.commit()?;
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.try_into()
|
||||
.unwrap();
|
||||
.try_into()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
writer.add_document(doc!(field=>1u64));
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _handle = index.directory_mut().watch(Box::new(move || {
|
||||
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
writer.commit().unwrap();
|
||||
writer.commit()?;
|
||||
assert!(receiver.recv().is_ok());
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
reader.reload().unwrap();
|
||||
reader.reload()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -554,9 +561,11 @@ mod tests {
|
||||
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
|
||||
let mut reader_index = reader.index();
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
let _watch_handle = reader_index
|
||||
.directory_mut()
|
||||
.watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
writer.add_document(doc!(field=>1u64));
|
||||
@@ -595,7 +604,7 @@ mod tests {
|
||||
writer.add_document(doc!(field => i));
|
||||
}
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _handle = directory.watch(Box::new(move || {
|
||||
let _handle = directory.watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
writer.commit().unwrap();
|
||||
|
||||
@@ -66,7 +66,7 @@ impl InvertedIndexReader {
|
||||
}
|
||||
|
||||
/// Returns the term info associated with the term.
|
||||
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
|
||||
pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
|
||||
self.termdict.get(term.value_bytes())
|
||||
}
|
||||
|
||||
@@ -90,9 +90,9 @@ impl InvertedIndexReader {
|
||||
term_info: &TermInfo,
|
||||
block_postings: &mut BlockSegmentPostings,
|
||||
) -> io::Result<()> {
|
||||
let postings_slice = self
|
||||
.postings_file_slice
|
||||
.slice_from(term_info.postings_offset as usize);
|
||||
let start_offset = term_info.postings_start_offset as usize;
|
||||
let stop_offset = term_info.postings_stop_offset as usize;
|
||||
let postings_slice = self.postings_file_slice.slice(start_offset, stop_offset);
|
||||
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
|
||||
Ok(())
|
||||
}
|
||||
@@ -106,10 +106,9 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<BlockSegmentPostings>> {
|
||||
Ok(self
|
||||
.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
|
||||
.transpose()?)
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns a block postings given a `term_info`.
|
||||
@@ -121,8 +120,10 @@ impl InvertedIndexReader {
|
||||
term_info: &TermInfo,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> io::Result<BlockSegmentPostings> {
|
||||
let offset = term_info.postings_offset as usize;
|
||||
let postings_data = self.postings_file_slice.slice_from(offset);
|
||||
let postings_data = self.postings_file_slice.slice(
|
||||
term_info.postings_start_offset as usize,
|
||||
term_info.postings_stop_offset as usize,
|
||||
);
|
||||
BlockSegmentPostings::open(
|
||||
term_info.doc_freq,
|
||||
postings_data,
|
||||
@@ -179,7 +180,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -189,7 +190,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(|term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -197,7 +198,7 @@ impl InvertedIndexReader {
|
||||
/// Returns the number of documents containing the term.
|
||||
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
|
||||
Ok(self
|
||||
.get_term_info(term)
|
||||
.get_term_info(term)?
|
||||
.map(|term_info| term_info.doc_freq)
|
||||
.unwrap_or(0u32))
|
||||
}
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
use crate::collector::Collector;
|
||||
use crate::core::Executor;
|
||||
use crate::core::InvertedIndexReader;
|
||||
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::Query;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, Term};
|
||||
use crate::schema::Term;
|
||||
use crate::space_usage::SearcherSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::termdict::TermMerger;
|
||||
use crate::DocAddress;
|
||||
use crate::Index;
|
||||
use std::sync::Arc;
|
||||
|
||||
use std::{fmt, io};
|
||||
|
||||
/// Holds a list of `SegmentReader`s ready for search.
|
||||
@@ -148,16 +147,6 @@ impl Searcher {
|
||||
collector.merge_fruits(fruits)
|
||||
}
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
|
||||
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.inverted_index(field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
Ok(FieldSearcher::new(inv_index_readers))
|
||||
}
|
||||
|
||||
/// Summarize total space usage of this searcher.
|
||||
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
||||
let mut space_usage = SearcherSpaceUsage::new();
|
||||
@@ -168,27 +157,6 @@ impl Searcher {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FieldSearcher {
|
||||
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
|
||||
}
|
||||
|
||||
impl FieldSearcher {
|
||||
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
|
||||
FieldSearcher { inv_index_readers }
|
||||
}
|
||||
|
||||
/// Returns a Stream over all of the sorted unique terms of
|
||||
/// for the given field.
|
||||
pub fn terms(&self) -> TermMerger<'_> {
|
||||
let term_streamers: Vec<_> = self
|
||||
.inv_index_readers
|
||||
.iter()
|
||||
.map(|inverted_index| inverted_index.terms().stream())
|
||||
.collect();
|
||||
TermMerger::new(term_streamers)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let segment_ids = self
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{FileHandle, WatchCallback};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@@ -108,10 +108,13 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
/// should be your default choice.
|
||||
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
||||
/// should be used mostly for tests.
|
||||
///
|
||||
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// Opens a virtual file for read.
|
||||
/// Opens a file and returns a boxed `FileHandle`.
|
||||
///
|
||||
/// Users of `Directory` should typically call `Directory::open_read(...)`,
|
||||
/// while `Directory` implementor should implement `get_file_handle()`.
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
|
||||
|
||||
/// Once a virtual file is open, its data may not
|
||||
/// change.
|
||||
///
|
||||
@@ -119,7 +122,10 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// have no effect on the returned `FileSlice` object.
|
||||
///
|
||||
/// You should only use this to read files create with [Directory::open_write].
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
|
||||
let file_handle = self.get_file_handle(path)?;
|
||||
Ok(FileSlice::new(file_handle))
|
||||
}
|
||||
|
||||
/// Removes a file
|
||||
///
|
||||
@@ -131,7 +137,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
fn delete(&self, path: &Path) -> Result<(), DeleteError>;
|
||||
|
||||
/// Returns true iff the file exists
|
||||
fn exists(&self, path: &Path) -> bool;
|
||||
fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
|
||||
|
||||
/// Opens a writer for the *virtual file* associated with
|
||||
/// a Path.
|
||||
|
||||
@@ -58,7 +58,8 @@ pub enum OpenWriteError {
|
||||
}
|
||||
|
||||
impl OpenWriteError {
|
||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
/// Wraps an io error.
|
||||
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
Self::IOError { io_error, filepath }
|
||||
}
|
||||
}
|
||||
@@ -143,7 +144,8 @@ pub enum OpenReadError {
|
||||
}
|
||||
|
||||
impl OpenReadError {
|
||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
/// Wraps an io error.
|
||||
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
Self::IOError { io_error, filepath }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ use stable_deref_trait::StableDeref;
|
||||
|
||||
use crate::common::HasLen;
|
||||
use crate::directory::OwnedBytes;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{io, ops::Deref};
|
||||
|
||||
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
|
||||
/// Objects that represents files sections in tantivy.
|
||||
///
|
||||
@@ -40,7 +41,7 @@ where
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
|
||||
{
|
||||
fn from(bytes: B) -> FileSlice {
|
||||
FileSlice::new(OwnedBytes::new(bytes))
|
||||
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,22 +51,25 @@ where
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct FileSlice {
|
||||
data: Arc<Box<dyn FileHandle>>,
|
||||
data: Arc<dyn FileHandle>,
|
||||
start: usize,
|
||||
stop: usize,
|
||||
}
|
||||
|
||||
impl FileSlice {
|
||||
/// Wraps a FileHandle.
|
||||
pub fn new<D>(data: D) -> Self
|
||||
where
|
||||
D: FileHandle,
|
||||
{
|
||||
let len = data.len();
|
||||
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
|
||||
let num_bytes = file_handle.len();
|
||||
FileSlice::new_with_num_bytes(file_handle, num_bytes)
|
||||
}
|
||||
|
||||
/// Wraps a FileHandle.
|
||||
#[doc(hidden)]
|
||||
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
|
||||
FileSlice {
|
||||
data: Arc::new(Box::new(data)),
|
||||
data: Arc::from(file_handle),
|
||||
start: 0,
|
||||
stop: len,
|
||||
stop: num_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +150,12 @@ impl FileSlice {
|
||||
}
|
||||
}
|
||||
|
||||
impl FileHandle for FileSlice {
|
||||
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
|
||||
self.read_bytes_slice(from, to)
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLen for FileSlice {
|
||||
fn len(&self) -> usize {
|
||||
self.stop - self.start
|
||||
@@ -160,7 +170,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
let file_slice = FileSlice::new(b"abcdef".as_ref());
|
||||
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
|
||||
assert_eq!(file_slice.len(), 6);
|
||||
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
|
||||
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
|
||||
@@ -204,7 +214,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_simple_read() -> io::Result<()> {
|
||||
let slice = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice.len(), 6);
|
||||
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
|
||||
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
|
||||
@@ -213,7 +223,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_read_slice() -> io::Result<()> {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
|
||||
Ok(())
|
||||
}
|
||||
@@ -221,14 +231,14 @@ mod tests {
|
||||
#[test]
|
||||
#[should_panic(expected = "assertion failed: from <= to")]
|
||||
fn test_slice_read_slice_invalid_range() {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "`to` exceeds the fileslice length")]
|
||||
fn test_slice_read_slice_invalid_range_exceeds() {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(
|
||||
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
|
||||
b"bcd"
|
||||
|
||||
178
src/directory/file_watcher.rs
Normal file
178
src/directory/file_watcher.rs
Normal file
@@ -0,0 +1,178 @@
|
||||
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use crc32fast::Hasher;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
|
||||
|
||||
// Watches a file and executes registered callbacks when the file is modified.
|
||||
pub struct FileWatcher {
|
||||
path: Arc<Path>,
|
||||
callbacks: Arc<WatchCallbackList>,
|
||||
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub fn new(path: &Path) -> FileWatcher {
|
||||
FileWatcher {
|
||||
path: Arc::from(path),
|
||||
callbacks: Default::default(),
|
||||
state: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(&self) {
|
||||
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let path = self.path.clone();
|
||||
let callbacks = self.callbacks.clone();
|
||||
let state = self.state.clone();
|
||||
|
||||
thread::Builder::new()
|
||||
.name("thread-tantivy-meta-file-watcher".to_string())
|
||||
.spawn(move || {
|
||||
let mut current_checksum = None;
|
||||
|
||||
while state.load(Ordering::SeqCst) == 1 {
|
||||
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
|
||||
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
|
||||
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
|
||||
info!("Meta file {:?} was modified", path);
|
||||
current_checksum = Some(checksum);
|
||||
futures::executor::block_on(callbacks.broadcast());
|
||||
}
|
||||
}
|
||||
|
||||
thread::sleep(POLLING_INTERVAL);
|
||||
}
|
||||
})
|
||||
.expect("Failed to spawn meta file watcher thread");
|
||||
}
|
||||
|
||||
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
|
||||
let handle = self.callbacks.subscribe(callback);
|
||||
self.spawn();
|
||||
handle
|
||||
}
|
||||
|
||||
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
|
||||
let reader = match fs::File::open(path) {
|
||||
Ok(f) => io::BufReader::new(f),
|
||||
Err(e) => {
|
||||
warn!("Failed to open meta file {:?}: {:?}", path, e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let mut hasher = Hasher::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
hasher.update(line?.as_bytes())
|
||||
}
|
||||
|
||||
Ok(hasher.finalize())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FileWatcher {
|
||||
fn drop(&mut self) {
|
||||
self.state.store(2, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::mem;
|
||||
|
||||
use crate::directory::mmap_directory::atomic_write;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
|
||||
let tmp_dir = tempfile::TempDir::new()?;
|
||||
let tmp_file = tmp_dir.path().join("watched.txt");
|
||||
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let timeout = Duration::from_millis(100);
|
||||
|
||||
let watcher = FileWatcher::new(&tmp_file);
|
||||
|
||||
let state = watcher.state.clone();
|
||||
assert_eq!(state.load(Ordering::SeqCst), 0);
|
||||
|
||||
let counter_clone = counter.clone();
|
||||
|
||||
let _handle = watcher.watch(WatchCallback::new(move || {
|
||||
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
tx.send(val + 1).unwrap();
|
||||
}));
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(state.load(Ordering::SeqCst), 1);
|
||||
|
||||
atomic_write(&tmp_file, b"foo")?;
|
||||
assert_eq!(rx.recv_timeout(timeout), Ok(1));
|
||||
|
||||
atomic_write(&tmp_file, b"foo")?;
|
||||
assert!(rx.recv_timeout(timeout).is_err());
|
||||
|
||||
atomic_write(&tmp_file, b"bar")?;
|
||||
assert_eq!(rx.recv_timeout(timeout), Ok(2));
|
||||
|
||||
mem::drop(watcher);
|
||||
|
||||
atomic_write(&tmp_file, b"qux")?;
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(state.load(Ordering::SeqCst), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_watcher_drop_handle() -> crate::Result<()> {
|
||||
let tmp_dir = tempfile::TempDir::new()?;
|
||||
let tmp_file = tmp_dir.path().join("watched.txt");
|
||||
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let timeout = Duration::from_millis(100);
|
||||
|
||||
let watcher = FileWatcher::new(&tmp_file);
|
||||
|
||||
let state = watcher.state.clone();
|
||||
assert_eq!(state.load(Ordering::SeqCst), 0);
|
||||
|
||||
let counter_clone = counter.clone();
|
||||
|
||||
let handle = watcher.watch(WatchCallback::new(move || {
|
||||
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
tx.send(val + 1).unwrap();
|
||||
}));
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(state.load(Ordering::SeqCst), 1);
|
||||
|
||||
atomic_write(&tmp_file, b"foo")?;
|
||||
assert_eq!(rx.recv_timeout(timeout), Ok(1));
|
||||
|
||||
mem::drop(handle);
|
||||
|
||||
atomic_write(&tmp_file, b"qux")?;
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(state.load(Ordering::SeqCst), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -115,6 +115,18 @@ impl Footer {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
VersionedFooter::V3 {
|
||||
crc32: _crc,
|
||||
store_compression,
|
||||
} => {
|
||||
if &library_version.store_compression != store_compression {
|
||||
return Err(Incompatibility::CompressionMismatch {
|
||||
library_compression_format: library_version.store_compression.to_string(),
|
||||
index_compression_format: store_compression.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
|
||||
library_version: library_version.clone(),
|
||||
index_version: self.version.clone(),
|
||||
@@ -136,24 +148,31 @@ pub enum VersionedFooter {
|
||||
crc32: CrcHashU32,
|
||||
store_compression: String,
|
||||
},
|
||||
// Block wand max termfred on 1 byte
|
||||
V3 {
|
||||
crc32: CrcHashU32,
|
||||
store_compression: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl BinarySerializable for VersionedFooter {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
let mut buf = Vec::new();
|
||||
match self {
|
||||
VersionedFooter::V2 {
|
||||
VersionedFooter::V3 {
|
||||
crc32,
|
||||
store_compression: compression,
|
||||
} => {
|
||||
// Serializes a valid `VersionedFooter` or panics if the version is unknown
|
||||
// [ version | crc_hash | compression_mode ]
|
||||
// [ 0..4 | 4..8 | variable ]
|
||||
BinarySerializable::serialize(&2u32, &mut buf)?;
|
||||
BinarySerializable::serialize(&3u32, &mut buf)?;
|
||||
BinarySerializable::serialize(crc32, &mut buf)?;
|
||||
BinarySerializable::serialize(compression, &mut buf)?;
|
||||
}
|
||||
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => {
|
||||
VersionedFooter::V2 { .. }
|
||||
| VersionedFooter::V1 { .. }
|
||||
| VersionedFooter::UnknownVersion => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"Cannot serialize an unknown versioned footer ",
|
||||
@@ -182,7 +201,7 @@ impl BinarySerializable for VersionedFooter {
|
||||
reader.read_exact(&mut buf[..])?;
|
||||
let mut cursor = &buf[..];
|
||||
let version = u32::deserialize(&mut cursor)?;
|
||||
if version != 1 && version != 2 {
|
||||
if version > 3 {
|
||||
return Ok(VersionedFooter::UnknownVersion);
|
||||
}
|
||||
let crc32 = u32::deserialize(&mut cursor)?;
|
||||
@@ -192,12 +211,17 @@ impl BinarySerializable for VersionedFooter {
|
||||
crc32,
|
||||
store_compression,
|
||||
}
|
||||
} else {
|
||||
assert_eq!(version, 2);
|
||||
} else if version == 2 {
|
||||
VersionedFooter::V2 {
|
||||
crc32,
|
||||
store_compression,
|
||||
}
|
||||
} else {
|
||||
assert_eq!(version, 3);
|
||||
VersionedFooter::V3 {
|
||||
crc32,
|
||||
store_compression,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -205,6 +229,7 @@ impl BinarySerializable for VersionedFooter {
|
||||
impl VersionedFooter {
|
||||
pub fn crc(&self) -> Option<CrcHashU32> {
|
||||
match self {
|
||||
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
|
||||
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
|
||||
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
|
||||
VersionedFooter::UnknownVersion { .. } => None,
|
||||
@@ -243,7 +268,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
|
||||
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
|
||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
||||
let crc32 = self.hasher.take().unwrap().finalize();
|
||||
let footer = Footer::new(VersionedFooter::V2 {
|
||||
let footer = Footer::new(VersionedFooter::V3 {
|
||||
crc32,
|
||||
store_compression: crate::store::COMPRESSION.to_string(),
|
||||
});
|
||||
@@ -278,7 +303,7 @@ mod tests {
|
||||
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
|
||||
assert!(matches!(
|
||||
footer.versioned_footer,
|
||||
VersionedFooter::V2 { store_compression, .. }
|
||||
VersionedFooter::V3 { store_compression, .. }
|
||||
if store_compression == crate::store::COMPRESSION
|
||||
));
|
||||
assert_eq!(&footer.version, crate::version());
|
||||
@@ -288,7 +313,7 @@ mod tests {
|
||||
fn test_serialize_deserialize_footer() {
|
||||
let mut buffer = Vec::new();
|
||||
let crc32 = 123456u32;
|
||||
let footer: Footer = Footer::new(VersionedFooter::V2 {
|
||||
let footer: Footer = Footer::new(VersionedFooter::V3 {
|
||||
crc32,
|
||||
store_compression: "lz4".to_string(),
|
||||
});
|
||||
@@ -300,7 +325,7 @@ mod tests {
|
||||
#[test]
|
||||
fn footer_length() {
|
||||
let crc32 = 1111111u32;
|
||||
let versioned_footer = VersionedFooter::V2 {
|
||||
let versioned_footer = VersionedFooter::V3 {
|
||||
crc32,
|
||||
store_compression: "lz4".to_string(),
|
||||
};
|
||||
@@ -321,7 +346,7 @@ mod tests {
|
||||
// versionned footer length
|
||||
12 | 128,
|
||||
// index format version
|
||||
2,
|
||||
3,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
@@ -340,7 +365,7 @@ mod tests {
|
||||
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
|
||||
assert!(cursor.is_empty());
|
||||
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
|
||||
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 {
|
||||
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
|
||||
crc32: expected_crc,
|
||||
store_compression: "lz4".to_string(),
|
||||
};
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::GarbageCollectionResult;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::directory::{DirectoryLock, FileHandle};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
@@ -274,6 +274,11 @@ impl ManagedDirectory {
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
let file_slice = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(file_slice)
|
||||
@@ -307,7 +312,7 @@ impl Directory for ManagedDirectory {
|
||||
self.directory.delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
|
||||
self.directory.exists(path)
|
||||
}
|
||||
|
||||
@@ -355,22 +360,22 @@ mod tests_mmap_specific {
|
||||
managed_directory
|
||||
.atomic_write(test_path2, &[0u8, 1u8])
|
||||
.unwrap();
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
assert!(managed_directory.exists(test_path2));
|
||||
assert!(managed_directory.exists(test_path1).unwrap());
|
||||
assert!(managed_directory.exists(test_path2).unwrap());
|
||||
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
|
||||
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
assert!(!managed_directory.exists(test_path2));
|
||||
assert!(managed_directory.exists(test_path1).unwrap());
|
||||
assert!(!managed_directory.exists(test_path2).unwrap());
|
||||
}
|
||||
{
|
||||
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
|
||||
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
assert!(!managed_directory.exists(test_path2));
|
||||
assert!(managed_directory.exists(test_path1).unwrap());
|
||||
assert!(!managed_directory.exists(test_path2).unwrap());
|
||||
let living_files: HashSet<PathBuf> = HashSet::new();
|
||||
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
|
||||
assert!(!managed_directory.exists(test_path1));
|
||||
assert!(!managed_directory.exists(test_path2));
|
||||
assert!(!managed_directory.exists(test_path1).unwrap());
|
||||
assert!(!managed_directory.exists(test_path2).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,7 +392,7 @@ mod tests_mmap_specific {
|
||||
let mut write = managed_directory.open_write(test_path1).unwrap();
|
||||
write.write_all(&[0u8, 1u8]).unwrap();
|
||||
write.terminate().unwrap();
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
assert!(managed_directory.exists(test_path1).unwrap());
|
||||
|
||||
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
|
||||
assert!(managed_directory
|
||||
@@ -395,15 +400,15 @@ mod tests_mmap_specific {
|
||||
.is_ok());
|
||||
if cfg!(target_os = "windows") {
|
||||
// On Windows, gc should try and fail the file as it is mmapped.
|
||||
assert!(managed_directory.exists(test_path1));
|
||||
assert!(managed_directory.exists(test_path1).unwrap());
|
||||
// unmap should happen here.
|
||||
drop(_mmap_read);
|
||||
// The file should still be in the list of managed file and
|
||||
// eventually be deleted once mmap is released.
|
||||
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
|
||||
assert!(!managed_directory.exists(test_path1));
|
||||
assert!(!managed_directory.exists(test_path1).unwrap());
|
||||
} else {
|
||||
assert!(!managed_directory.exists(test_path1));
|
||||
assert!(!managed_directory.exists(test_path1).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,21 +1,17 @@
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::BoxedData;
|
||||
use crate::directory::file_watcher::FileWatcher;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchCallbackList;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
|
||||
use crate::directory::{ArcBytes, WeakArcBytes};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fs2::FileExt;
|
||||
use memmap::Mmap;
|
||||
use notify::RawEvent;
|
||||
use notify::RecursiveMode;
|
||||
use notify::Watcher;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::From;
|
||||
@@ -26,12 +22,8 @@ use std::io::{self, Seek, SeekFrom};
|
||||
use std::io::{BufWriter, Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
use std::thread;
|
||||
use std::{collections::HashMap, ops::Deref};
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -84,7 +76,7 @@ pub struct CacheInfo {
|
||||
|
||||
struct MmapCache {
|
||||
counters: CacheCounters,
|
||||
cache: HashMap<PathBuf, Weak<BoxedData>>,
|
||||
cache: HashMap<PathBuf, WeakArcBytes>,
|
||||
}
|
||||
|
||||
impl Default for MmapCache {
|
||||
@@ -118,7 +110,7 @@ impl MmapCache {
|
||||
}
|
||||
|
||||
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> {
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
|
||||
if let Some(mmap_weak) = self.cache.get(full_path) {
|
||||
if let Some(mmap_arc) = mmap_weak.upgrade() {
|
||||
self.counters.hit += 1;
|
||||
@@ -129,7 +121,7 @@ impl MmapCache {
|
||||
self.counters.miss += 1;
|
||||
let mmap_opt = open_mmap(full_path)?;
|
||||
Ok(mmap_opt.map(|mmap| {
|
||||
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
|
||||
let mmap_arc: ArcBytes = Arc::new(mmap);
|
||||
let mmap_weak = Arc::downgrade(&mmap_arc);
|
||||
self.cache.insert(full_path.to_owned(), mmap_weak);
|
||||
mmap_arc
|
||||
@@ -137,67 +129,6 @@ impl MmapCache {
|
||||
}
|
||||
}
|
||||
|
||||
struct WatcherWrapper {
|
||||
_watcher: Mutex<notify::RecommendedWatcher>,
|
||||
watcher_router: Arc<WatchCallbackList>,
|
||||
}
|
||||
|
||||
impl WatcherWrapper {
|
||||
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
|
||||
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
|
||||
// We need to initialize the
|
||||
let watcher = notify::raw_watcher(tx)
|
||||
.and_then(|mut watcher| {
|
||||
watcher.watch(path, RecursiveMode::Recursive)?;
|
||||
Ok(watcher)
|
||||
})
|
||||
.map_err(|err| match err {
|
||||
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
|
||||
_ => {
|
||||
panic!("Unknown error while starting watching directory {:?}", path);
|
||||
}
|
||||
})?;
|
||||
let watcher_router: Arc<WatchCallbackList> = Default::default();
|
||||
let watcher_router_clone = watcher_router.clone();
|
||||
thread::Builder::new()
|
||||
.name("meta-file-watch-thread".to_string())
|
||||
.spawn(move || {
|
||||
loop {
|
||||
match watcher_recv.recv().map(|evt| evt.path) {
|
||||
Ok(Some(changed_path)) => {
|
||||
// ... Actually subject to false positive.
|
||||
// We might want to be more accurate than this at one point.
|
||||
if let Some(filename) = changed_path.file_name() {
|
||||
if filename == *META_FILEPATH {
|
||||
let _ = watcher_router_clone.broadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// not an event we are interested in.
|
||||
}
|
||||
Err(_e) => {
|
||||
// the watch send channel was dropped
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.map_err(|io_error| OpenDirectoryError::IoError {
|
||||
io_error,
|
||||
directory_path: path.to_path_buf(),
|
||||
})?;
|
||||
Ok(WatcherWrapper {
|
||||
_watcher: Mutex::new(watcher),
|
||||
watcher_router,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
|
||||
self.watcher_router.subscribe(watch_callback)
|
||||
}
|
||||
}
|
||||
|
||||
/// Directory storing data in files, read via mmap.
|
||||
///
|
||||
/// The Mmap object are cached to limit the
|
||||
@@ -219,40 +150,21 @@ struct MmapDirectoryInner {
|
||||
root_path: PathBuf,
|
||||
mmap_cache: RwLock<MmapCache>,
|
||||
_temp_directory: Option<TempDir>,
|
||||
watcher: RwLock<Option<WatcherWrapper>>,
|
||||
watcher: FileWatcher,
|
||||
}
|
||||
|
||||
impl MmapDirectoryInner {
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
|
||||
MmapDirectoryInner {
|
||||
root_path,
|
||||
mmap_cache: Default::default(),
|
||||
_temp_directory: temp_directory,
|
||||
watcher: RwLock::new(None),
|
||||
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
|
||||
root_path,
|
||||
}
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
// a lot of juggling here, to ensure we don't do anything that panics
|
||||
// while the rwlock is held. That way we ensure that the rwlock cannot
|
||||
// be poisoned.
|
||||
//
|
||||
// The downside is that we might create a watch wrapper that is not useful.
|
||||
let need_initialization = self.watcher.read().unwrap().is_none();
|
||||
if need_initialization {
|
||||
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
|
||||
let mut watch_wlock = self.watcher.write().unwrap();
|
||||
// the watcher could have been initialized when we released the lock, and
|
||||
// we do not want to lose the watched files that were set.
|
||||
if watch_wlock.is_none() {
|
||||
*watch_wlock = Some(watch_wrapper);
|
||||
}
|
||||
}
|
||||
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
|
||||
Ok(watch_wrapper.watch(watch_callback))
|
||||
} else {
|
||||
unreachable!("At this point, watch wrapper is supposed to be initialized");
|
||||
}
|
||||
fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
Ok(self.watcher.watch(callback))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,7 +314,7 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>);
|
||||
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
|
||||
|
||||
impl Deref for MmapArc {
|
||||
type Target = [u8];
|
||||
@@ -413,8 +325,26 @@ impl Deref for MmapArc {
|
||||
}
|
||||
unsafe impl StableDeref for MmapArc {}
|
||||
|
||||
/// Writes a file in an atomic manner.
|
||||
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
// We create the temporary file in the same directory as the target file.
|
||||
// Indeed the canonical temp directory and the target file might sit in different
|
||||
// filesystem, in which case the atomic write may actually not work.
|
||||
let parent_path = path.parent().ok_or_else(|| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"Path {:?} does not have parent directory.",
|
||||
)
|
||||
})?;
|
||||
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
|
||||
tempfile.write_all(content)?;
|
||||
tempfile.flush()?;
|
||||
tempfile.into_temp_path().persist(path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
@@ -427,11 +357,16 @@ impl Directory for MmapDirectory {
|
||||
let io_err = make_io_err(msg);
|
||||
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
|
||||
})?;
|
||||
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
|
||||
Ok(FileSlice::from(MmapArc(mmap_arc)))
|
||||
} else {
|
||||
Ok(FileSlice::empty())
|
||||
}
|
||||
|
||||
let owned_bytes = mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(|mmap_arc| {
|
||||
let mmap_arc_obj = MmapArc(mmap_arc);
|
||||
OwnedBytes::new(mmap_arc_obj)
|
||||
})
|
||||
.unwrap_or_else(OwnedBytes::empty);
|
||||
|
||||
Ok(Box::new(owned_bytes))
|
||||
}
|
||||
|
||||
/// Any entry associated to the path in the mmap will be
|
||||
@@ -456,9 +391,9 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
|
||||
let full_path = self.resolve_path(path);
|
||||
full_path.exists()
|
||||
Ok(full_path.exists())
|
||||
}
|
||||
|
||||
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
@@ -513,12 +448,9 @@ impl Directory for MmapDirectory {
|
||||
|
||||
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
|
||||
tempfile.write_all(content)?;
|
||||
tempfile.flush()?;
|
||||
let full_path = self.resolve_path(path);
|
||||
tempfile.into_temp_path().persist(full_path)?;
|
||||
Ok(())
|
||||
atomic_write(&full_path, content)?;
|
||||
self.sync_directory()
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
@@ -557,8 +489,6 @@ mod tests {
|
||||
use crate::Index;
|
||||
use crate::ReloadPolicy;
|
||||
use crate::{common::HasLen, indexer::LogMergePolicy};
|
||||
use std::fs;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
#[test]
|
||||
fn test_open_non_existent_path() {
|
||||
@@ -647,27 +577,6 @@ mod tests {
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watch_wrapper() {
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let tmp_dirpath = tmp_dir.path().to_owned();
|
||||
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
|
||||
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
|
||||
let _handle = watch_wrapper.watch(Box::new(move || {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
}));
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _handle2 = watch_wrapper.watch(Box::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
fs::write(&tmp_file, b"whateverwilldo").unwrap();
|
||||
assert!(receiver.recv().is_ok());
|
||||
assert!(counter.load(Ordering::SeqCst) >= 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mmap_released() {
|
||||
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
|
||||
@@ -10,6 +10,7 @@ mod mmap_directory;
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod file_slice;
|
||||
mod file_watcher;
|
||||
mod footer;
|
||||
mod managed_directory;
|
||||
mod owned_bytes;
|
||||
@@ -22,7 +23,7 @@ pub mod error;
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub(crate) use self::file_slice::BoxedData;
|
||||
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
|
||||
pub use self::file_slice::{FileHandle, FileSlice};
|
||||
pub use self::owned_bytes::OwnedBytes;
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::directory::FileHandle;
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::TryInto;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
@@ -95,6 +96,24 @@ impl OwnedBytes {
|
||||
pub fn advance(&mut self, advance_len: usize) {
|
||||
self.data = &self.data[advance_len..]
|
||||
}
|
||||
|
||||
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
|
||||
pub fn read_u8(&mut self) -> u8 {
|
||||
assert!(!self.is_empty());
|
||||
|
||||
let byte = self.as_slice()[0];
|
||||
self.advance(1);
|
||||
byte
|
||||
}
|
||||
|
||||
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
|
||||
pub fn read_u64(&mut self) -> u64 {
|
||||
assert!(self.len() > 7);
|
||||
|
||||
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
|
||||
self.advance(8);
|
||||
u64::from_le_bytes(octlet)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OwnedBytes {
|
||||
@@ -230,6 +249,22 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u8() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u8(), 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u64() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u64(), u64::MAX - 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_split() {
|
||||
let bytes = OwnedBytes::new(b"abcdefghi".as_ref());
|
||||
|
||||
@@ -12,6 +12,8 @@ use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::FileHandle;
|
||||
|
||||
/// Writer associated with the `RAMDirectory`
|
||||
///
|
||||
/// The Writer just writes a buffer.
|
||||
@@ -163,6 +165,11 @@ impl RAMDirectory {
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
@@ -177,8 +184,15 @@ impl Directory for RAMDirectory {
|
||||
self.fs.write().unwrap().delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.read().unwrap().exists(path)
|
||||
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
|
||||
Ok(self
|
||||
.fs
|
||||
.read()
|
||||
.map_err(|e| OpenReadError::IOError {
|
||||
io_error: io::Error::new(io::ErrorKind::Other, e.to_string()),
|
||||
filepath: path.to_path_buf(),
|
||||
})?
|
||||
.exists(path))
|
||||
}
|
||||
|
||||
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
|
||||
@@ -130,7 +130,7 @@ fn ram_directory_panics_if_flush_forgotten() {
|
||||
fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
|
||||
let test_path: &'static Path = Path::new("some_path_for_test");
|
||||
let mut write_file = directory.open_write(test_path)?;
|
||||
assert!(directory.exists(test_path));
|
||||
assert!(directory.exists(test_path).unwrap());
|
||||
write_file.write_all(&[4])?;
|
||||
write_file.write_all(&[3])?;
|
||||
write_file.write_all(&[7, 3, 5])?;
|
||||
@@ -139,14 +139,14 @@ fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
|
||||
assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]);
|
||||
mem::drop(read_file);
|
||||
assert!(directory.delete(test_path).is_ok());
|
||||
assert!(!directory.exists(test_path));
|
||||
assert!(!directory.exists(test_path).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> {
|
||||
let test_path: &'static Path = Path::new("some_path_for_test");
|
||||
directory.open_write(test_path)?;
|
||||
assert!(directory.exists(test_path));
|
||||
assert!(directory.exists(test_path).unwrap());
|
||||
assert!(directory.open_write(test_path).is_err());
|
||||
assert!(directory.delete(test_path).is_ok());
|
||||
Ok(())
|
||||
@@ -157,7 +157,7 @@ fn test_write_create_the_file(directory: &dyn Directory) {
|
||||
{
|
||||
assert!(directory.open_read(test_path).is_err());
|
||||
let _w = directory.open_write(test_path).unwrap();
|
||||
assert!(directory.exists(test_path));
|
||||
assert!(directory.exists(test_path).unwrap());
|
||||
assert!(directory.open_read(test_path).is_ok());
|
||||
assert!(directory.delete(test_path).is_ok());
|
||||
}
|
||||
@@ -190,38 +190,33 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
|
||||
}
|
||||
|
||||
fn test_watch(directory: &dyn Directory) {
|
||||
let num_progress: Arc<AtomicUsize> = Default::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let watch_callback = Box::new(move || {
|
||||
counter_clone.fetch_add(1, SeqCst);
|
||||
});
|
||||
// This callback is used to synchronize watching in our unit test.
|
||||
// We bind it to a variable because the callback is removed when that
|
||||
// handle is dropped.
|
||||
let watch_handle = directory.watch(watch_callback).unwrap();
|
||||
let _progress_listener = directory
|
||||
.watch(Box::new(move || {
|
||||
let val = num_progress.fetch_add(1, SeqCst);
|
||||
let _ = sender.send(val);
|
||||
let (tx, rx) = crossbeam::channel::unbounded();
|
||||
let timeout = Duration::from_millis(500);
|
||||
|
||||
let handle = directory
|
||||
.watch(WatchCallback::new(move || {
|
||||
let val = counter.fetch_add(1, SeqCst);
|
||||
tx.send(val + 1).unwrap();
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
assert!(i <= counter.load(SeqCst));
|
||||
assert!(directory
|
||||
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
|
||||
.is_ok());
|
||||
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
|
||||
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
|
||||
}
|
||||
mem::drop(watch_handle);
|
||||
assert!(directory
|
||||
.atomic_write(Path::new("meta.json"), b"random_test_data")
|
||||
.atomic_write(Path::new("meta.json"), b"foo")
|
||||
.is_ok());
|
||||
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
|
||||
assert!(10 <= counter.load(SeqCst));
|
||||
assert_eq!(rx.recv_timeout(timeout), Ok(1));
|
||||
|
||||
assert!(directory
|
||||
.atomic_write(Path::new("meta.json"), b"bar")
|
||||
.is_ok());
|
||||
assert_eq!(rx.recv_timeout(timeout), Ok(2));
|
||||
|
||||
mem::drop(handle);
|
||||
|
||||
assert!(directory
|
||||
.atomic_write(Path::new("meta.json"), b"qux")
|
||||
.is_ok());
|
||||
assert!(rx.recv_timeout(timeout).is_err());
|
||||
}
|
||||
|
||||
fn test_lock_non_blocking(directory: &dyn Directory) {
|
||||
|
||||
@@ -4,8 +4,20 @@ use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
|
||||
/// Type alias for callbacks registered when watching files of a `Directory`.
|
||||
pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
|
||||
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
|
||||
#[derive(Clone)]
|
||||
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
|
||||
|
||||
impl WatchCallback {
|
||||
/// Wraps a `Fn()` to create a WatchCallback.
|
||||
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
|
||||
WatchCallback(Arc::new(op))
|
||||
}
|
||||
|
||||
fn call(&self) {
|
||||
self.0()
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper struct to implement the watch method in `Directory` implementations.
|
||||
///
|
||||
@@ -34,7 +46,7 @@ impl WatchHandle {
|
||||
///
|
||||
/// This function is only useful when implementing a readonly directory.
|
||||
pub fn empty() -> WatchHandle {
|
||||
WatchHandle::new(Arc::new(Box::new(|| {})))
|
||||
WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,13 +59,13 @@ impl WatchCallbackList {
|
||||
WatchHandle::new(watch_callback_arc)
|
||||
}
|
||||
|
||||
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
|
||||
let mut callbacks = vec![];
|
||||
fn list_callback(&self) -> Vec<WatchCallback> {
|
||||
let mut callbacks: Vec<WatchCallback> = vec![];
|
||||
let mut router_wlock = self.router.write().unwrap();
|
||||
let mut i = 0;
|
||||
while i < router_wlock.len() {
|
||||
if let Some(watch) = router_wlock[i].upgrade() {
|
||||
callbacks.push(watch);
|
||||
callbacks.push(watch.as_ref().clone());
|
||||
i += 1;
|
||||
} else {
|
||||
router_wlock.swap_remove(i);
|
||||
@@ -75,7 +87,7 @@ impl WatchCallbackList {
|
||||
.name("watch-callbacks".to_string())
|
||||
.spawn(move || {
|
||||
for callback in callbacks {
|
||||
callback();
|
||||
callback.call();
|
||||
}
|
||||
let _ = sender.send(());
|
||||
});
|
||||
@@ -91,7 +103,7 @@ impl WatchCallbackList {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::directory::WatchCallbackList;
|
||||
use crate::directory::{WatchCallback, WatchCallbackList};
|
||||
use futures::executor::block_on;
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
@@ -102,7 +114,7 @@ mod tests {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
let inc_callback = WatchCallback::new(move || {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
});
|
||||
block_on(watch_event_router.broadcast());
|
||||
@@ -130,7 +142,7 @@ mod tests {
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let inc_callback = |inc: usize| {
|
||||
let counter_clone = counter.clone();
|
||||
Box::new(move || {
|
||||
WatchCallback::new(move || {
|
||||
counter_clone.fetch_add(inc, Ordering::SeqCst);
|
||||
})
|
||||
};
|
||||
@@ -158,7 +170,7 @@ mod tests {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
let inc_callback = WatchCallback::new(move || {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
});
|
||||
let handle_a = watch_event_router.subscribe(inc_callback);
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
|
||||
pub const TERMINATED: DocId = std::i32::MAX as u32;
|
||||
|
||||
/// Represents an iterable set of sorted doc ids.
|
||||
pub trait DocSet {
|
||||
pub trait DocSet: Send {
|
||||
/// Goes to the next element.
|
||||
///
|
||||
/// The DocId of the next element is returned.
|
||||
@@ -129,6 +129,14 @@ impl<'a> DocSet for &'a mut dyn DocSet {
|
||||
fn size_hint(&self) -> u32 {
|
||||
(**self).size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
(**self).count(delete_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
(**self).count_including_deleted()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
|
||||
@@ -86,7 +86,7 @@ mod tests {
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight = term_query.specialized_weight(&searcher, true)?;
|
||||
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32)?;
|
||||
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
|
||||
assert_eq!(term_scorer.doc(), 0u32);
|
||||
Ok(())
|
||||
}
|
||||
@@ -98,10 +98,9 @@ mod tests {
|
||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight = term_query.specialized_weight(&searcher, false)?;
|
||||
let term_scorer_err = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32);
|
||||
let term_weight_err = term_query.specialized_weight(&searcher, false);
|
||||
assert!(matches!(
|
||||
term_scorer_err,
|
||||
term_weight_err,
|
||||
Err(crate::TantivyError::SchemaError(_))
|
||||
));
|
||||
Ok(())
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::MultiValueIntFastFieldReader;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::schema::Facet;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -62,12 +63,13 @@ impl FacetReader {
|
||||
&mut self,
|
||||
facet_ord: TermOrdinal,
|
||||
output: &mut Facet,
|
||||
) -> Result<(), str::Utf8Error> {
|
||||
) -> crate::Result<()> {
|
||||
let found_term = self
|
||||
.term_dict
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer);
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
|
||||
assert!(found_term, "Term ordinal {} no found.", facet_ord);
|
||||
let facet_str = str::from_utf8(&self.buffer[..])?;
|
||||
let facet_str = str::from_utf8(&self.buffer[..])
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||
output.set_facet_str(facet_str);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -629,7 +629,7 @@ mod bench {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data);
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let n = test::black_box(7000u32);
|
||||
@@ -663,7 +663,7 @@ mod bench {
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file).unwrap();
|
||||
let data = fast_fields_composite.open_read(*FIELD).unwrap();
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data);
|
||||
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
|
||||
|
||||
b.iter(|| {
|
||||
let n = test::black_box(1000u32);
|
||||
|
||||
@@ -51,6 +51,15 @@ impl<Item: FastValue> FastFieldReader<Item> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
|
||||
FastFieldReader {
|
||||
bit_unpacker: self.bit_unpacker,
|
||||
min_value_u64: self.min_value_u64,
|
||||
max_value_u64: self.max_value_u64,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the value associated to the given document.
|
||||
///
|
||||
/// This accessor should return as fast as possible.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::common::CompositeFile;
|
||||
use crate::fastfield::BytesFastFieldReader;
|
||||
use crate::fastfield::MultiValueIntFastFieldReader;
|
||||
use crate::fastfield::{BytesFastFieldReader, FastValue};
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
|
||||
use crate::schema::{Cardinality, Field, FieldType, Schema};
|
||||
use crate::space_usage::PerFieldSpaceUsage;
|
||||
@@ -201,6 +201,14 @@ impl FastFieldReaders {
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
|
||||
&self,
|
||||
field: Field,
|
||||
) -> Option<FastFieldReader<TFastValue>> {
|
||||
self.u64_lenient(field)
|
||||
.map(|fast_field_reader| fast_field_reader.cast())
|
||||
}
|
||||
|
||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns `None`.
|
||||
|
||||
@@ -61,20 +61,56 @@ impl FieldNormReaders {
|
||||
/// precompute computationally expensive functions of the fieldnorm
|
||||
/// in a very short array.
|
||||
#[derive(Clone)]
|
||||
pub struct FieldNormReader {
|
||||
data: OwnedBytes,
|
||||
pub struct FieldNormReader(ReaderImplEnum);
|
||||
|
||||
impl From<ReaderImplEnum> for FieldNormReader {
|
||||
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
|
||||
FieldNormReader(reader_enum)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum ReaderImplEnum {
|
||||
FromData(OwnedBytes),
|
||||
Const {
|
||||
num_docs: u32,
|
||||
fieldnorm_id: u8,
|
||||
fieldnorm: u32,
|
||||
},
|
||||
}
|
||||
|
||||
impl FieldNormReader {
|
||||
/// Creates a `FieldNormReader` with a constant fieldnorm.
|
||||
///
|
||||
/// The fieldnorm will be subjected to compression as if it was coming
|
||||
/// from an array-backed fieldnorm reader.
|
||||
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
|
||||
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
|
||||
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
|
||||
ReaderImplEnum::Const {
|
||||
num_docs,
|
||||
fieldnorm_id,
|
||||
fieldnorm,
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Opens a field norm reader given its file.
|
||||
pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> {
|
||||
let data = fieldnorm_file.read_bytes()?;
|
||||
Ok(FieldNormReader { data })
|
||||
Ok(FieldNormReader::new(data))
|
||||
}
|
||||
|
||||
fn new(data: OwnedBytes) -> Self {
|
||||
ReaderImplEnum::FromData(data).into()
|
||||
}
|
||||
|
||||
/// Returns the number of documents in this segment.
|
||||
pub fn num_docs(&self) -> u32 {
|
||||
self.data.len() as u32
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => data.len() as u32,
|
||||
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm` associated to a doc id.
|
||||
@@ -87,14 +123,25 @@ impl FieldNormReader {
|
||||
/// The fieldnorm is effectively decoded from the
|
||||
/// `fieldnorm_id` by doing a simple table lookup.
|
||||
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
|
||||
let fieldnorm_id = self.fieldnorm_id(doc_id);
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm_id` associated to a document.
|
||||
#[inline(always)]
|
||||
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
|
||||
self.data.as_slice()[doc_id as usize]
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
fieldnorm_id
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a `fieldnorm_id` into a fieldnorm.
|
||||
@@ -118,9 +165,7 @@ impl FieldNormReader {
|
||||
.map(FieldNormReader::fieldnorm_to_id)
|
||||
.collect::<Vec<u8>>();
|
||||
let field_norms_data = OwnedBytes::new(field_norms_id);
|
||||
FieldNormReader {
|
||||
data: field_norms_data,
|
||||
}
|
||||
FieldNormReader::new(field_norms_data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,4 +184,20 @@ mod tests {
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,45 +1,94 @@
|
||||
use rand::thread_rng;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::schema::*;
|
||||
use crate::Index;
|
||||
use crate::Searcher;
|
||||
use crate::{doc, schema::*};
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
use std::collections::HashSet;
|
||||
|
||||
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
|
||||
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
|
||||
assert!(searcher.segment_readers().len() < 20);
|
||||
assert_eq!(searcher.num_docs() as usize, vals.len());
|
||||
for segment_reader in searcher.segment_readers() {
|
||||
let store_reader = segment_reader.get_store_reader()?;
|
||||
for doc_id in 0..segment_reader.max_doc() {
|
||||
let _doc = store_reader.get(doc_id)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_indexing() {
|
||||
fn test_functional_store() -> crate::Result<()> {
|
||||
env_logger::init();
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
let reader = index.reader()?;
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
|
||||
|
||||
let mut doc_set: Vec<u64> = Vec::new();
|
||||
|
||||
let mut doc_id = 0u64;
|
||||
for iteration in 0.. {
|
||||
let num_docs: usize = rng.gen_range(0..4);
|
||||
if doc_set.len() >= 1 {
|
||||
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
|
||||
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
|
||||
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
|
||||
}
|
||||
for _ in 0..num_docs {
|
||||
doc_set.push(doc_id);
|
||||
index_writer.add_document(doc!(id_field=>doc_id));
|
||||
doc_id += 1;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
reader.reload()?;
|
||||
let searcher = reader.searcher();
|
||||
println!("#{} - {}", iteration, searcher.segment_readers().len());
|
||||
check_index_content(&searcher, &doc_set)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_functional_indexing() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
let id_field = schema_builder.add_u64_field("id", INDEXED);
|
||||
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let index = Index::create_from_tempdir(schema).unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let index = Index::create_from_tempdir(schema)?;
|
||||
let reader = index.reader()?;
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
|
||||
|
||||
let mut committed_docs: HashSet<u64> = HashSet::new();
|
||||
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
|
||||
|
||||
for _ in 0..200 {
|
||||
let random_val = rng.gen_range(0, 20);
|
||||
let random_val = rng.gen_range(0..20);
|
||||
if random_val == 0 {
|
||||
index_writer.commit().expect("Commit failed");
|
||||
index_writer.commit()?;
|
||||
committed_docs.extend(&uncommitted_docs);
|
||||
uncommitted_docs.clear();
|
||||
reader.reload().unwrap();
|
||||
reader.reload()?;
|
||||
let searcher = reader.searcher();
|
||||
// check that everything is correct.
|
||||
check_index_content(&searcher, &committed_docs);
|
||||
check_index_content(
|
||||
&searcher,
|
||||
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
|
||||
)?;
|
||||
} else {
|
||||
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
|
||||
let doc_id_term = Term::from_field_u64(id_field, random_val);
|
||||
@@ -55,4 +104,5 @@ fn test_indexing() {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl DeleteQueue {
|
||||
return block;
|
||||
}
|
||||
let block = Arc::new(Block {
|
||||
operations: Arc::default(),
|
||||
operations: Arc::new([]),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
wlock.last_block = Arc::downgrade(&block);
|
||||
@@ -108,7 +108,7 @@ impl DeleteQueue {
|
||||
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
|
||||
|
||||
let new_block = Arc::new(Block {
|
||||
operations: Arc::new(delete_operations.into_boxed_slice()),
|
||||
operations: Arc::from(delete_operations.into_boxed_slice()),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
|
||||
@@ -167,7 +167,7 @@ impl NextBlock {
|
||||
}
|
||||
|
||||
struct Block {
|
||||
operations: Arc<Box<[DeleteOperation]>>,
|
||||
operations: Arc<[DeleteOperation]>,
|
||||
next: NextBlock,
|
||||
}
|
||||
|
||||
|
||||
@@ -449,7 +449,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
/// Accessor to the merge policy.
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
self.segment_updater.get_merge_policy()
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
|
||||
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
|
||||
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
|
||||
|
||||
/// `LogMergePolicy` tries tries to merge segments that have a similar number of
|
||||
/// `LogMergePolicy` tries to merge segments that have a similar number of
|
||||
/// documents.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LogMergePolicy {
|
||||
|
||||
@@ -503,7 +503,6 @@ impl IndexMerger {
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
|
||||
|
||||
let field_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
@@ -512,9 +511,10 @@ impl IndexMerger {
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
for field_reader in &field_readers {
|
||||
let terms = field_reader.terms();
|
||||
field_term_streams.push(terms.stream());
|
||||
field_term_streams.push(terms.stream()?);
|
||||
max_term_ords.push(terms.num_terms() as u64);
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,15 @@ pub struct DeleteOperation {
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl Default for DeleteOperation {
|
||||
fn default() -> Self {
|
||||
DeleteOperation {
|
||||
opstamp: 0u64,
|
||||
term: Term::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timestamped Add operation.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub struct AddOperation {
|
||||
|
||||
@@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
|
||||
|
||||
index: Index,
|
||||
segment_manager: SegmentManager,
|
||||
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
|
||||
merge_policy: RwLock<Arc<dyn MergePolicy>>,
|
||||
killed: AtomicBool,
|
||||
stamper: Stamper,
|
||||
merge_operations: MergeOperationInventory,
|
||||
@@ -193,19 +193,19 @@ impl SegmentUpdater {
|
||||
merge_thread_pool,
|
||||
index,
|
||||
segment_manager,
|
||||
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
|
||||
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
|
||||
killed: AtomicBool::new(false),
|
||||
stamper,
|
||||
merge_operations: Default::default(),
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
self.merge_policy.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
|
||||
let arc_merge_policy = Arc::new(merge_policy);
|
||||
let arc_merge_policy = Arc::from(merge_policy);
|
||||
*self.merge_policy.write().unwrap() = arc_merge_policy;
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Index format version.
|
||||
const INDEX_FORMAT_VERSION: u32 = 2;
|
||||
const INDEX_FORMAT_VERSION: u32 = 3;
|
||||
|
||||
/// Structure version for the index.
|
||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::io::{self, Write};
|
||||
pub struct PositionSerializer<W: io::Write> {
|
||||
bit_packer: BitPacker4x,
|
||||
write_stream: CountingWriter<W>,
|
||||
write_skiplist: W,
|
||||
write_skip_index: W,
|
||||
block: Vec<u32>,
|
||||
buffer: Vec<u8>,
|
||||
num_ints: u64,
|
||||
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
|
||||
}
|
||||
|
||||
impl<W: io::Write> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
|
||||
PositionSerializer {
|
||||
bit_packer: BitPacker4x::new(),
|
||||
write_stream: CountingWriter::wrap(write_stream),
|
||||
write_skiplist,
|
||||
write_skip_index,
|
||||
block: Vec::with_capacity(128),
|
||||
buffer: vec![0u8; 128 * 4],
|
||||
num_ints: 0u64,
|
||||
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
let num_bits = self.bit_packer.num_bits(&self.block[..]);
|
||||
self.write_skiplist.write_all(&[num_bits])?;
|
||||
self.write_skip_index.write_all(&[num_bits])?;
|
||||
let written_len = self
|
||||
.bit_packer
|
||||
.compress(&self.block[..], &mut self.buffer, num_bits);
|
||||
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
self.flush_block()?;
|
||||
}
|
||||
for &long_skip in &self.long_skips {
|
||||
long_skip.serialize(&mut self.write_skiplist)?;
|
||||
long_skip.serialize(&mut self.write_skip_index)?;
|
||||
}
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
|
||||
self.write_skiplist.flush()?;
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
|
||||
self.write_skip_index.flush()?;
|
||||
self.write_stream.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ mod tests {
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
|
||||
inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
@@ -513,7 +513,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
block_segments = inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
|
||||
}
|
||||
@@ -521,7 +521,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 1u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
|
||||
}
|
||||
assert_eq!(block_segments.docs(), &[1, 3, 5]);
|
||||
|
||||
@@ -15,18 +15,14 @@ mod stacker;
|
||||
mod term_info;
|
||||
|
||||
pub(crate) use self::block_search::BlockSearcher;
|
||||
|
||||
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
||||
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
||||
|
||||
pub use self::postings::Postings;
|
||||
pub(crate) use self::skip::{BlockInfo, SkipReader};
|
||||
pub use self::term_info::TermInfo;
|
||||
|
||||
pub use self::block_segment_postings::BlockSegmentPostings;
|
||||
pub use self::postings::Postings;
|
||||
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
||||
pub use self::segment_postings::SegmentPostings;
|
||||
|
||||
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
||||
pub(crate) use self::skip::{BlockInfo, SkipReader};
|
||||
pub(crate) use self::stacker::compute_table_size;
|
||||
pub use self::term_info::TermInfo;
|
||||
|
||||
pub(crate) type UnorderedTermId = u64;
|
||||
|
||||
@@ -51,17 +47,14 @@ pub mod tests {
|
||||
use crate::indexer::SegmentWriter;
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::query::Scorer;
|
||||
use crate::schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
|
||||
use crate::schema::{Field, TextOptions};
|
||||
use crate::schema::{IndexRecordOption, TextFieldIndexing};
|
||||
use crate::schema::{Schema, Term, INDEXED, TEXT};
|
||||
use crate::tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
|
||||
use crate::DocId;
|
||||
use crate::HasLen;
|
||||
use crate::Score;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use std::iter;
|
||||
use std::{iter, mem};
|
||||
|
||||
#[test]
|
||||
pub fn test_position_write() -> crate::Result<()> {
|
||||
@@ -78,6 +71,7 @@ pub mod tests {
|
||||
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
|
||||
}
|
||||
field_serializer.close_term()?;
|
||||
mem::drop(field_serializer);
|
||||
posting_serializer.close()?;
|
||||
let read = segment.open_read(SegmentComponent::POSITIONS)?;
|
||||
assert!(read.len() <= 140);
|
||||
@@ -186,7 +180,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert_eq!(&bytes, b"hello");
|
||||
}
|
||||
{
|
||||
@@ -198,7 +192,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert_eq!(&bytes[..], ok_token_text.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
@@ -491,53 +485,6 @@ pub mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "a")
|
||||
});
|
||||
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "b")
|
||||
});
|
||||
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "c")
|
||||
});
|
||||
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "d")
|
||||
});
|
||||
|
||||
pub static INDEX: Lazy<Index> = Lazy::new(|| {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", STRING);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
let posting_list_size = 1_000_000;
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
for _ in 0..posting_list_size {
|
||||
let mut doc = Document::default();
|
||||
if rng.gen_bool(1f64 / 15f64) {
|
||||
doc.add_text(text_field, "a");
|
||||
}
|
||||
if rng.gen_bool(1f64 / 10f64) {
|
||||
doc.add_text(text_field, "b");
|
||||
}
|
||||
if rng.gen_bool(1f64 / 5f64) {
|
||||
doc.add_text(text_field, "c");
|
||||
}
|
||||
doc.add_text(text_field, "d");
|
||||
index_writer.add_document(doc);
|
||||
}
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
index
|
||||
});
|
||||
|
||||
/// Wraps a given docset, and forward alls call but the
|
||||
/// `.skip_next(...)`. This is useful to test that a specialized
|
||||
/// implementation of `.skip_next(...)` is consistent
|
||||
@@ -602,15 +549,65 @@ pub mod tests {
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use super::tests::*;
|
||||
use crate::docset::TERMINATED;
|
||||
use crate::query::Intersection;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::{Document, Field, Schema, Term, STRING};
|
||||
use crate::tests;
|
||||
use crate::DocSet;
|
||||
use crate::Index;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use test::{self, Bencher};
|
||||
|
||||
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "a")
|
||||
});
|
||||
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "b")
|
||||
});
|
||||
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "c")
|
||||
});
|
||||
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
|
||||
let field = Field::from_field_id(0);
|
||||
Term::from_field_text(field, "d")
|
||||
});
|
||||
|
||||
pub static INDEX: Lazy<Index> = Lazy::new(|| {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", STRING);
|
||||
let schema = schema_builder.build();
|
||||
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
|
||||
let index = Index::create_in_ram(schema);
|
||||
let posting_list_size = 1_000_000;
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests().unwrap();
|
||||
for _ in 0..posting_list_size {
|
||||
let mut doc = Document::default();
|
||||
if rng.gen_bool(1f64 / 15f64) {
|
||||
doc.add_text(text_field, "a");
|
||||
}
|
||||
if rng.gen_bool(1f64 / 10f64) {
|
||||
doc.add_text(text_field, "b");
|
||||
}
|
||||
if rng.gen_bool(1f64 / 5f64) {
|
||||
doc.add_text(text_field, "c");
|
||||
}
|
||||
doc.add_text(text_field, "d");
|
||||
index_writer.add_document(doc);
|
||||
}
|
||||
assert!(index_writer.commit().is_ok());
|
||||
}
|
||||
index
|
||||
});
|
||||
|
||||
#[bench]
|
||||
fn bench_segment_postings(b: &mut Bencher) {
|
||||
let reader = INDEX.reader().unwrap();
|
||||
@@ -620,7 +617,9 @@ mod bench {
|
||||
b.iter(|| {
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(TERM_A.field())
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)?
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
while segment_postings.advance() != TERMINATED {}
|
||||
});
|
||||
@@ -634,21 +633,25 @@ mod bench {
|
||||
b.iter(|| {
|
||||
let segment_postings_a = segment_reader
|
||||
.inverted_index(TERM_A.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let segment_postings_b = segment_reader
|
||||
.inverted_index(TERM_B.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_B, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let segment_postings_c = segment_reader
|
||||
.inverted_index(TERM_C.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_C, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let segment_postings_d = segment_reader
|
||||
.inverted_index(TERM_D.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_D, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
@@ -670,6 +673,7 @@ mod bench {
|
||||
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(TERM_A.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
@@ -687,7 +691,9 @@ mod bench {
|
||||
b.iter(|| {
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(TERM_A.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
for doc in &existing_docs {
|
||||
if segment_postings.seek(*doc) == TERMINATED {
|
||||
@@ -726,7 +732,9 @@ mod bench {
|
||||
let n: u32 = test::black_box(17);
|
||||
let mut segment_postings = segment_reader
|
||||
.inverted_index(TERM_A.field())
|
||||
.unwrap()
|
||||
.read_postings(&*TERM_A, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let mut s = 0u32;
|
||||
while segment_postings.doc() != TERMINATED {
|
||||
|
||||
@@ -177,14 +177,16 @@ impl<'a> FieldSerializer<'a> {
|
||||
}
|
||||
|
||||
fn current_term_info(&self) -> TermInfo {
|
||||
let positions_idx = self
|
||||
.positions_serializer_opt
|
||||
.as_ref()
|
||||
.map(PositionSerializer::positions_idx)
|
||||
.unwrap_or(0u64);
|
||||
let positions_idx =
|
||||
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
|
||||
positions_serializer.positions_idx()
|
||||
} else {
|
||||
0u64
|
||||
};
|
||||
TermInfo {
|
||||
doc_freq: 0,
|
||||
postings_offset: self.postings_serializer.addr(),
|
||||
postings_start_offset: self.postings_serializer.addr(),
|
||||
postings_stop_offset: 0u64,
|
||||
positions_idx,
|
||||
}
|
||||
}
|
||||
@@ -238,10 +240,11 @@ impl<'a> FieldSerializer<'a> {
|
||||
/// using `VInt` encoding.
|
||||
pub fn close_term(&mut self) -> io::Result<()> {
|
||||
if self.term_open {
|
||||
self.term_dictionary_builder
|
||||
.insert_value(&self.current_term_info)?;
|
||||
self.postings_serializer
|
||||
.close_term(self.current_term_info.doc_freq)?;
|
||||
self.current_term_info.postings_stop_offset = self.postings_serializer.addr();
|
||||
self.term_dictionary_builder
|
||||
.insert_value(&self.current_term_info)?;
|
||||
self.term_open = false;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,32 +1,46 @@
|
||||
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::query::BM25Weight;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::{DocId, Score, TERMINATED};
|
||||
|
||||
#[inline(always)]
|
||||
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
|
||||
max_tf.min(u8::MAX as u32) as u8
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
|
||||
if max_tf_code == u8::MAX {
|
||||
u32::MAX
|
||||
} else {
|
||||
max_tf_code as u32
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn read_u32(data: &[u8]) -> u32 {
|
||||
u32::from_le_bytes(data[..4].try_into().unwrap())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn write_u32(val: u32, buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(&val.to_le_bytes());
|
||||
}
|
||||
|
||||
pub struct SkipSerializer {
|
||||
buffer: Vec<u8>,
|
||||
prev_doc: DocId,
|
||||
}
|
||||
|
||||
impl SkipSerializer {
|
||||
pub fn new() -> SkipSerializer {
|
||||
SkipSerializer {
|
||||
buffer: Vec::new(),
|
||||
prev_doc: 0u32,
|
||||
}
|
||||
SkipSerializer { buffer: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
|
||||
assert!(
|
||||
last_doc > self.prev_doc,
|
||||
"write_doc(...) called with non-increasing doc ids. \
|
||||
Did you forget to call clear maybe?"
|
||||
);
|
||||
let delta_doc = last_doc - self.prev_doc;
|
||||
self.prev_doc = last_doc;
|
||||
delta_doc.serialize(&mut self.buffer).unwrap();
|
||||
write_u32(last_doc, &mut self.buffer);
|
||||
self.buffer.push(doc_num_bits);
|
||||
}
|
||||
|
||||
@@ -35,16 +49,13 @@ impl SkipSerializer {
|
||||
}
|
||||
|
||||
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
|
||||
tf_sum
|
||||
.serialize(&mut self.buffer)
|
||||
.expect("Should never fail");
|
||||
write_u32(tf_sum, &mut self.buffer);
|
||||
}
|
||||
|
||||
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
|
||||
self.buffer.push(fieldnorm_id);
|
||||
let mut buf = [0u8; 8];
|
||||
let bytes = serialize_vint_u32(term_freq, &mut buf);
|
||||
self.buffer.extend_from_slice(bytes);
|
||||
let block_wand_tf = encode_block_wand_max_tf(term_freq);
|
||||
self.buffer
|
||||
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
|
||||
}
|
||||
|
||||
pub fn data(&self) -> &[u8] {
|
||||
@@ -52,7 +63,6 @@ impl SkipSerializer {
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.prev_doc = 0u32;
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
@@ -159,18 +169,13 @@ impl SkipReader {
|
||||
}
|
||||
|
||||
fn read_block_info(&mut self) {
|
||||
let doc_delta = {
|
||||
let bytes = self.owned_read.as_slice();
|
||||
let mut buf = [0; 4];
|
||||
buf.copy_from_slice(&bytes[..4]);
|
||||
u32::from_le_bytes(buf)
|
||||
};
|
||||
self.last_doc_in_block += doc_delta as DocId;
|
||||
let doc_num_bits = self.owned_read.as_slice()[4];
|
||||
|
||||
let bytes = self.owned_read.as_slice();
|
||||
let advance_len: usize;
|
||||
self.last_doc_in_block = read_u32(bytes);
|
||||
let doc_num_bits = bytes[4];
|
||||
match self.skip_info {
|
||||
IndexRecordOption::Basic => {
|
||||
self.owned_read.advance(5);
|
||||
advance_len = 5;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
tf_num_bits: 0,
|
||||
@@ -180,11 +185,10 @@ impl SkipReader {
|
||||
};
|
||||
}
|
||||
IndexRecordOption::WithFreqs => {
|
||||
let bytes = self.owned_read.as_slice();
|
||||
let tf_num_bits = bytes[5];
|
||||
let block_wand_fieldnorm_id = bytes[6];
|
||||
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]);
|
||||
self.owned_read.advance(7 + num_bytes);
|
||||
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
|
||||
advance_len = 8;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
tf_num_bits,
|
||||
@@ -194,16 +198,11 @@ impl SkipReader {
|
||||
};
|
||||
}
|
||||
IndexRecordOption::WithFreqsAndPositions => {
|
||||
let bytes = self.owned_read.as_slice();
|
||||
let tf_num_bits = bytes[5];
|
||||
let tf_sum = {
|
||||
let mut buf = [0; 4];
|
||||
buf.copy_from_slice(&bytes[6..10]);
|
||||
u32::from_le_bytes(buf)
|
||||
};
|
||||
let tf_sum = read_u32(&bytes[6..10]);
|
||||
let block_wand_fieldnorm_id = bytes[10];
|
||||
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]);
|
||||
self.owned_read.advance(11 + num_bytes);
|
||||
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
|
||||
advance_len = 12;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
tf_num_bits,
|
||||
@@ -213,6 +212,7 @@ impl SkipReader {
|
||||
};
|
||||
}
|
||||
}
|
||||
self.owned_read.advance(advance_len);
|
||||
}
|
||||
|
||||
pub fn block_info(&self) -> BlockInfo {
|
||||
@@ -274,6 +274,24 @@ mod tests {
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
|
||||
#[test]
|
||||
fn test_encode_block_wand_max_tf() {
|
||||
for tf in 0..255 {
|
||||
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
|
||||
}
|
||||
for &tf in &[255, 256, 1_000_000, u32::MAX] {
|
||||
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_block_wand_max_tf() {
|
||||
for tf in 0..255 {
|
||||
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
|
||||
}
|
||||
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_with_freq() {
|
||||
let buf = {
|
||||
|
||||
@@ -7,35 +7,50 @@ use std::io;
|
||||
pub struct TermInfo {
|
||||
/// Number of documents in the segment containing the term
|
||||
pub doc_freq: u32,
|
||||
/// Start offset within the postings (`.idx`) file.
|
||||
pub postings_offset: u64,
|
||||
/// Start offset of the posting list within the postings (`.idx`) file.
|
||||
pub postings_start_offset: u64,
|
||||
/// Stop offset of the posting list within the postings (`.idx`) file.
|
||||
/// The byte range is `[start_offset..stop_offset)`.
|
||||
pub postings_stop_offset: u64,
|
||||
/// Start offset of the first block within the position (`.pos`) file.
|
||||
pub positions_idx: u64,
|
||||
}
|
||||
|
||||
impl TermInfo {
|
||||
pub(crate) fn posting_num_bytes(&self) -> u32 {
|
||||
let num_bytes = self.postings_stop_offset - self.postings_start_offset;
|
||||
assert!(num_bytes <= std::u32::MAX as u64);
|
||||
num_bytes as u32
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for TermInfo {
|
||||
/// Size required for the binary serialization of a `TermInfo` object.
|
||||
/// This is large, but in practise, `TermInfo` are encoded in blocks and
|
||||
/// only the first `TermInfo` of a block is serialized uncompressed.
|
||||
/// The subsequent `TermInfo` are delta encoded and bitpacked.
|
||||
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
|
||||
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
impl BinarySerializable for TermInfo {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
self.doc_freq.serialize(writer)?;
|
||||
self.postings_offset.serialize(writer)?;
|
||||
self.postings_start_offset.serialize(writer)?;
|
||||
self.posting_num_bytes().serialize(writer)?;
|
||||
self.positions_idx.serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let doc_freq = u32::deserialize(reader)?;
|
||||
let postings_offset = u64::deserialize(reader)?;
|
||||
let postings_start_offset = u64::deserialize(reader)?;
|
||||
let postings_num_bytes = u32::deserialize(reader)?;
|
||||
let postings_stop_offset = postings_start_offset + u64::from(postings_num_bytes);
|
||||
let positions_idx = u64::deserialize(reader)?;
|
||||
Ok(TermInfo {
|
||||
doc_freq,
|
||||
postings_offset,
|
||||
postings_start_offset,
|
||||
postings_stop_offset,
|
||||
positions_idx,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::schema::{Field, IndexRecordOption};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::TantivyError;
|
||||
use crate::{DocId, Score};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
@@ -19,6 +20,7 @@ pub struct AutomatonWeight<A> {
|
||||
impl<A> AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
/// Create a new AutomationWeight
|
||||
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
|
||||
@@ -28,7 +30,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
|
||||
fn automaton_stream<'a>(
|
||||
&'a self,
|
||||
term_dict: &'a TermDictionary,
|
||||
) -> io::Result<TermStreamer<'a, &'a A>> {
|
||||
let automaton: &A = &*self.automaton;
|
||||
let term_stream_builder = term_dict.search(automaton);
|
||||
term_stream_builder.into_stream()
|
||||
@@ -38,13 +43,14 @@ where
|
||||
impl<A> Weight for AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
let max_doc = reader.max_doc();
|
||||
let mut doc_bitset = BitSet::with_max_value(max_doc);
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_stream = self.automaton_stream(term_dict);
|
||||
let mut term_stream = self.automaton_stream(term_dict)?;
|
||||
while term_stream.advance() {
|
||||
let term_info = term_stream.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
@@ -98,6 +104,7 @@ mod tests {
|
||||
index
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum State {
|
||||
Start,
|
||||
NotMatching,
|
||||
|
||||
@@ -106,7 +106,7 @@ impl BM25Weight {
|
||||
BM25Weight::new(idf_explain, avg_fieldnorm)
|
||||
}
|
||||
|
||||
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
let weight = idf_explain.value() * (1.0 + K1);
|
||||
BM25Weight {
|
||||
idf_explain,
|
||||
|
||||
@@ -268,7 +268,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn nearly_equals(left: Score, right: Score) -> bool {
|
||||
(left - right).abs() < 0.000001 * (left + right).abs()
|
||||
(left - right).abs() < 0.0001 * (left + right).abs()
|
||||
}
|
||||
|
||||
fn compute_checkpoints_for_each_pruning(
|
||||
@@ -424,9 +424,116 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fn_reproduce_proptest() {
|
||||
let postings_lists = &[
|
||||
vec![
|
||||
(0, 1),
|
||||
(1, 1),
|
||||
(2, 1),
|
||||
(3, 1),
|
||||
(4, 1),
|
||||
(6, 1),
|
||||
(7, 7),
|
||||
(8, 1),
|
||||
(10, 1),
|
||||
(12, 1),
|
||||
(13, 1),
|
||||
(14, 1),
|
||||
(15, 1),
|
||||
(16, 1),
|
||||
(19, 1),
|
||||
(20, 1),
|
||||
(21, 1),
|
||||
(22, 1),
|
||||
(24, 1),
|
||||
(25, 1),
|
||||
(26, 1),
|
||||
(28, 1),
|
||||
(30, 1),
|
||||
(31, 1),
|
||||
(33, 1),
|
||||
(34, 1),
|
||||
(35, 1),
|
||||
(36, 95),
|
||||
(37, 1),
|
||||
(39, 1),
|
||||
(41, 1),
|
||||
(44, 1),
|
||||
(46, 1),
|
||||
],
|
||||
vec![
|
||||
(0, 5),
|
||||
(2, 1),
|
||||
(4, 1),
|
||||
(5, 84),
|
||||
(6, 47),
|
||||
(7, 26),
|
||||
(8, 50),
|
||||
(9, 34),
|
||||
(11, 73),
|
||||
(12, 11),
|
||||
(13, 51),
|
||||
(14, 45),
|
||||
(15, 18),
|
||||
(18, 60),
|
||||
(19, 80),
|
||||
(20, 63),
|
||||
(23, 79),
|
||||
(24, 69),
|
||||
(26, 35),
|
||||
(28, 82),
|
||||
(29, 19),
|
||||
(30, 2),
|
||||
(31, 7),
|
||||
(33, 40),
|
||||
(34, 1),
|
||||
(35, 33),
|
||||
(36, 27),
|
||||
(37, 24),
|
||||
(38, 65),
|
||||
(39, 32),
|
||||
(40, 85),
|
||||
(41, 1),
|
||||
(42, 69),
|
||||
(43, 11),
|
||||
(45, 45),
|
||||
(47, 97),
|
||||
],
|
||||
vec![
|
||||
(2, 1),
|
||||
(4, 1),
|
||||
(7, 94),
|
||||
(8, 1),
|
||||
(9, 1),
|
||||
(10, 1),
|
||||
(12, 1),
|
||||
(15, 1),
|
||||
(22, 1),
|
||||
(23, 1),
|
||||
(26, 1),
|
||||
(27, 1),
|
||||
(32, 1),
|
||||
(33, 1),
|
||||
(34, 1),
|
||||
(36, 96),
|
||||
(39, 1),
|
||||
(41, 1),
|
||||
],
|
||||
];
|
||||
let fieldnorms = &[
|
||||
685, 239, 780, 564, 664, 827, 5, 56, 930, 887, 263, 665, 167, 127, 120, 919, 292, 92,
|
||||
489, 734, 814, 724, 700, 304, 128, 779, 311, 877, 774, 15, 866, 368, 894, 371, 982,
|
||||
502, 507, 669, 680, 76, 594, 626, 578, 331, 170, 639, 665, 186,
|
||||
][..];
|
||||
test_block_wand_aux(postings_lists, fieldnorms);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(500))]
|
||||
#[ignore]
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
|
||||
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
|
||||
}
|
||||
|
||||
@@ -310,7 +310,7 @@ mod tests {
|
||||
));
|
||||
let query = BooleanQuery::from(vec![(Occur::Should, term_a), (Occur::Should, term_b)]);
|
||||
let explanation = query.explain(&searcher, DocAddress(0, 0u32))?;
|
||||
assert_nearly_equals!(explanation.value(), 0.6931472f32);
|
||||
assert_nearly_equals!(explanation.value(), 0.6931472);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::schema::{Field, IndexRecordOption, Term};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::{DocId, Score};
|
||||
use std::collections::Bound;
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
|
||||
@@ -274,7 +275,7 @@ pub struct RangeWeight {
|
||||
}
|
||||
|
||||
impl RangeWeight {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
|
||||
use std::collections::Bound::*;
|
||||
let mut term_stream_builder = term_dict.range();
|
||||
term_stream_builder = match self.left_bound {
|
||||
@@ -298,7 +299,7 @@ impl Weight for RangeWeight {
|
||||
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_range = self.term_range(term_dict);
|
||||
let mut term_range = self.term_range(term_dict)?;
|
||||
while term_range.advance() {
|
||||
let term_info = term_range.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::marker::PhantomData;
|
||||
/// This is useful for queries like `+somethingrequired somethingoptional`.
|
||||
///
|
||||
/// Note that `somethingoptional` has no impact on the `DocSet`.
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> {
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
|
||||
req_scorer: TReqScorer,
|
||||
opt_scorer: TOptScorer,
|
||||
score_cache: Option<Score>,
|
||||
@@ -23,6 +23,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
where
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
/// Creates a new `RequiredOptionalScorer`.
|
||||
pub fn new(
|
||||
@@ -43,6 +44,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
|
||||
where
|
||||
TReqScorer: DocSet,
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.score_cache = None;
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::Score;
|
||||
|
||||
/// The `ScoreCombiner` trait defines how to compute
|
||||
/// an overall score given a list of scores.
|
||||
pub trait ScoreCombiner: Default + Clone + Copy + 'static {
|
||||
pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
|
||||
/// Aggregates the score combiner with the given scorer.
|
||||
///
|
||||
/// The `ScoreCombiner` may decide to call `.scorer.score()`
|
||||
|
||||
@@ -197,7 +197,7 @@ mod tests {
|
||||
let searcher = index.reader()?.searcher();
|
||||
{
|
||||
let explanation = term_query.explain(&searcher, DocAddress(0u32, 1u32))?;
|
||||
assert_nearly_equals!(explanation.value(), 0.6931472f32);
|
||||
assert_nearly_equals!(explanation.value(), 0.6931472);
|
||||
}
|
||||
{
|
||||
let explanation_err = term_query.explain(&searcher, DocAddress(0u32, 0u32));
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::term_weight::TermWeight;
|
||||
use crate::query::bm25::BM25Weight;
|
||||
use crate::query::Query;
|
||||
use crate::query::Weight;
|
||||
use crate::query::{Explanation, Query};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::Searcher;
|
||||
use crate::Term;
|
||||
@@ -93,7 +93,20 @@ impl TermQuery {
|
||||
scoring_enabled: bool,
|
||||
) -> crate::Result<TermWeight> {
|
||||
let term = self.term.clone();
|
||||
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
let field_entry = searcher.schema().get_field_entry(term.field());
|
||||
if !field_entry.is_indexed() {
|
||||
return Err(crate::TantivyError::SchemaError(format!(
|
||||
"Field {:?} is not indexed",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let bm25_weight;
|
||||
if scoring_enabled {
|
||||
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
} else {
|
||||
bm25_weight =
|
||||
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
|
||||
}
|
||||
let index_record_option = if scoring_enabled {
|
||||
self.index_record_option
|
||||
} else {
|
||||
@@ -103,6 +116,7 @@ impl TermQuery {
|
||||
self.term.clone(),
|
||||
index_record_option,
|
||||
bm25_weight,
|
||||
scoring_enabled,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,7 +302,7 @@ mod tests {
|
||||
let mut rng = rand::thread_rng();
|
||||
writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
for _ in 0..3_000 {
|
||||
let term_freq = rng.gen_range(1, 10000);
|
||||
let term_freq = rng.gen_range(1..10000);
|
||||
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
|
||||
let text = words.join(" ");
|
||||
writer.add_document(doc!(text_field=>text));
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::term_scorer::TermScorer;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::docset::DocSet;
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::postings::SegmentPostings;
|
||||
use crate::query::bm25::BM25Weight;
|
||||
use crate::query::explanation::does_not_match;
|
||||
@@ -15,6 +16,7 @@ pub struct TermWeight {
|
||||
term: Term,
|
||||
index_record_option: IndexRecordOption,
|
||||
similarity_weight: BM25Weight,
|
||||
scoring_enabled: bool,
|
||||
}
|
||||
|
||||
impl Weight for TermWeight {
|
||||
@@ -43,7 +45,7 @@ impl Weight for TermWeight {
|
||||
} else {
|
||||
let field = self.term.field();
|
||||
let inv_index = reader.inverted_index(field)?;
|
||||
let term_info = inv_index.get_term_info(&self.term);
|
||||
let term_info = inv_index.get_term_info(&self.term)?;
|
||||
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
|
||||
}
|
||||
}
|
||||
@@ -87,11 +89,13 @@ impl TermWeight {
|
||||
term: Term,
|
||||
index_record_option: IndexRecordOption,
|
||||
similarity_weight: BM25Weight,
|
||||
scoring_enabled: bool,
|
||||
) -> TermWeight {
|
||||
TermWeight {
|
||||
term,
|
||||
index_record_option,
|
||||
similarity_weight,
|
||||
scoring_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +106,11 @@ impl TermWeight {
|
||||
) -> crate::Result<TermScorer> {
|
||||
let field = self.term.field();
|
||||
let inverted_index = reader.inverted_index(field)?;
|
||||
let fieldnorm_reader = reader.get_fieldnorms_reader(field)?;
|
||||
let fieldnorm_reader = if self.scoring_enabled {
|
||||
reader.get_fieldnorms_reader(field)?
|
||||
} else {
|
||||
FieldNormReader::constant(reader.max_doc(), 1)
|
||||
};
|
||||
let similarity_weight = self.similarity_weight.boost_by(boost);
|
||||
let postings_opt: Option<SegmentPostings> =
|
||||
inverted_index.read_postings(&self.term, self.index_record_option)?;
|
||||
|
||||
@@ -3,9 +3,9 @@ mod pool;
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::directory::{Directory, WatchCallback};
|
||||
use crate::Index;
|
||||
use crate::Searcher;
|
||||
use crate::SegmentReader;
|
||||
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
|
||||
let watch_handle = inner_reader_arc
|
||||
.index
|
||||
.directory()
|
||||
.watch(Box::new(callback))?;
|
||||
.watch(WatchCallback::new(callback))?;
|
||||
watch_handle_opt = Some(watch_handle);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +233,7 @@ mod tests {
|
||||
assert_eq!(Facet::root(), Facet::from("/"));
|
||||
assert_eq!(format!("{}", Facet::root()), "/");
|
||||
assert!(Facet::root().is_root());
|
||||
assert_eq!(Facet::root().encoded_str(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::schema::Value;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
/// Internal representation of a document used for JSON
|
||||
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
|
||||
/// A `NamedFieldDocument` is a simple representation of a document
|
||||
/// as a `BTreeMap<String, Vec<Value>>`.
|
||||
///
|
||||
#[derive(Serialize)]
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
|
||||
/// Name of the compression scheme used in the doc store.
|
||||
///
|
||||
/// This name is appended to the version string of tantivy.
|
||||
pub const COMPRESSION: &'static str = "lz4";
|
||||
pub const COMPRESSION: &str = "lz4";
|
||||
|
||||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
compressed.clear();
|
||||
|
||||
168
src/store/index/block.rs
Normal file
168
src/store/index/block.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
use crate::common::VInt;
|
||||
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
|
||||
use crate::DocId;
|
||||
use std::io;
|
||||
|
||||
/// Represents a block of checkpoints.
|
||||
///
|
||||
/// The DocStore index checkpoints are organized into block
|
||||
/// for code-readability and compression purpose.
|
||||
///
|
||||
/// A block can be of any size.
|
||||
pub struct CheckpointBlock {
|
||||
pub checkpoints: Vec<Checkpoint>,
|
||||
}
|
||||
|
||||
impl Default for CheckpointBlock {
|
||||
fn default() -> CheckpointBlock {
|
||||
CheckpointBlock {
|
||||
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckpointBlock {
|
||||
/// If non-empty returns [start_doc, end_doc)
|
||||
/// for the overall block.
|
||||
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
|
||||
let start_doc_opt = self
|
||||
.checkpoints
|
||||
.first()
|
||||
.cloned()
|
||||
.map(|checkpoint| checkpoint.start_doc);
|
||||
let end_doc_opt = self
|
||||
.checkpoints
|
||||
.last()
|
||||
.cloned()
|
||||
.map(|checkpoint| checkpoint.end_doc);
|
||||
match (start_doc_opt, end_doc_opt) {
|
||||
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adding another checkpoint in the block.
|
||||
pub fn push(&mut self, checkpoint: Checkpoint) {
|
||||
if let Some(prev_checkpoint) = self.checkpoints.last() {
|
||||
assert!(checkpoint.follows(prev_checkpoint));
|
||||
}
|
||||
self.checkpoints.push(checkpoint);
|
||||
}
|
||||
|
||||
/// Returns the number of checkpoints in the block.
|
||||
pub fn len(&self) -> usize {
|
||||
self.checkpoints.len()
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> Checkpoint {
|
||||
self.checkpoints[idx]
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.checkpoints.clear();
|
||||
}
|
||||
|
||||
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
|
||||
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
|
||||
if self.checkpoints.is_empty() {
|
||||
return;
|
||||
}
|
||||
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
|
||||
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
|
||||
for checkpoint in &self.checkpoints {
|
||||
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
|
||||
VInt(delta_doc as u64).serialize_into_vec(buffer);
|
||||
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
|
||||
if data.is_empty() {
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
|
||||
}
|
||||
self.checkpoints.clear();
|
||||
let len = VInt::deserialize_u64(data)? as usize;
|
||||
if len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let mut doc = VInt::deserialize_u64(data)? as DocId;
|
||||
let mut start_offset = VInt::deserialize_u64(data)?;
|
||||
for _ in 0..len {
|
||||
let num_docs = VInt::deserialize_u64(data)? as DocId;
|
||||
let block_num_bytes = VInt::deserialize_u64(data)?;
|
||||
self.checkpoints.push(Checkpoint {
|
||||
start_doc: doc,
|
||||
end_doc: doc + num_docs,
|
||||
start_offset,
|
||||
end_offset: start_offset + block_num_bytes,
|
||||
});
|
||||
doc += num_docs;
|
||||
start_offset += block_num_bytes;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use std::io;
|
||||
|
||||
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
|
||||
let mut block = CheckpointBlock::default();
|
||||
for &checkpoint in checkpoints {
|
||||
block.push(checkpoint);
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
block.serialize(&mut buffer);
|
||||
let mut block_deser = CheckpointBlock::default();
|
||||
let checkpoint = Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 1,
|
||||
start_offset: 2,
|
||||
end_offset: 3,
|
||||
};
|
||||
block_deser.push(checkpoint); // < check that value is erased before deser
|
||||
let mut data = &buffer[..];
|
||||
block_deser.deserialize(&mut data)?;
|
||||
assert!(data.is_empty());
|
||||
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize_empty() -> io::Result<()> {
|
||||
test_aux_ser_deser(&[])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize_simple() -> io::Result<()> {
|
||||
let checkpoints = vec![Checkpoint {
|
||||
start_doc: 10,
|
||||
end_doc: 12,
|
||||
start_offset: 100,
|
||||
end_offset: 120,
|
||||
}];
|
||||
test_aux_ser_deser(&checkpoints)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_serialize() -> io::Result<()> {
|
||||
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
|
||||
let mut checkpoints = vec![];
|
||||
let mut start_doc = 0;
|
||||
for i in 0..10 {
|
||||
let end_doc = (i * i) as DocId;
|
||||
checkpoints.push(Checkpoint {
|
||||
start_doc,
|
||||
end_doc,
|
||||
start_offset: offsets[i],
|
||||
end_offset: offsets[i + 1],
|
||||
});
|
||||
start_doc = end_doc;
|
||||
}
|
||||
test_aux_ser_deser(&checkpoints)
|
||||
}
|
||||
}
|
||||
274
src/store/index/mod.rs
Normal file
274
src/store/index/mod.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
const CHECKPOINT_PERIOD: usize = 2;
|
||||
|
||||
use std::fmt;
|
||||
mod block;
|
||||
mod skip_index;
|
||||
mod skip_index_builder;
|
||||
|
||||
use crate::DocId;
|
||||
|
||||
pub use self::skip_index::SkipIndex;
|
||||
pub use self::skip_index_builder::SkipIndexBuilder;
|
||||
|
||||
/// A checkpoint contains meta-information about
|
||||
/// a block. Either a block of documents, or another block
|
||||
/// of checkpoints.
|
||||
///
|
||||
/// All of the intervals here defined are semi-open.
|
||||
/// The checkpoint describes that the block within the bytes
|
||||
/// `[start_offset..end_offset)` spans over the docs
|
||||
/// `[start_doc..end_doc)`.
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub struct Checkpoint {
|
||||
pub start_doc: DocId,
|
||||
pub end_doc: DocId,
|
||||
pub start_offset: u64,
|
||||
pub end_offset: u64,
|
||||
}
|
||||
|
||||
impl Checkpoint {
|
||||
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
|
||||
(self.start_doc == other.end_doc) &&
|
||||
(self.start_offset == other.end_offset)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Checkpoint {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"(doc=[{}..{}), bytes=[{}..{}))",
|
||||
self.start_doc, self.end_doc, self.start_offset, self.end_offset
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::{io, iter};
|
||||
|
||||
use futures::executor::block_on;
|
||||
use proptest::strategy::{BoxedStrategy, Strategy};
|
||||
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::schema::{SchemaBuilder, STORED, STRING};
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::{DocAddress, DocId, Index, Term};
|
||||
|
||||
use super::{SkipIndex, SkipIndexBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_empty() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
skip_index_builder.write(&mut output)?;
|
||||
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||
let mut skip_cursor = skip_index.checkpoints();
|
||||
assert!(skip_cursor.next().is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_single_el() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
let checkpoint = Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 2,
|
||||
start_offset: 0,
|
||||
end_offset: 3,
|
||||
};
|
||||
skip_index_builder.insert(checkpoint);
|
||||
skip_index_builder.write(&mut output)?;
|
||||
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||
let mut skip_cursor = skip_index.checkpoints();
|
||||
assert_eq!(skip_cursor.next(), Some(checkpoint));
|
||||
assert_eq!(skip_cursor.next(), None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let checkpoints = vec![
|
||||
Checkpoint {
|
||||
start_doc: 0,
|
||||
end_doc: 3,
|
||||
start_offset: 4,
|
||||
end_offset: 9,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 3,
|
||||
end_doc: 4,
|
||||
start_offset: 9,
|
||||
end_offset: 25,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 4,
|
||||
end_doc: 6,
|
||||
start_offset: 25,
|
||||
end_offset: 49,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 6,
|
||||
end_doc: 8,
|
||||
start_offset: 49,
|
||||
end_offset: 81,
|
||||
},
|
||||
Checkpoint {
|
||||
start_doc: 8,
|
||||
end_doc: 10,
|
||||
start_offset: 81,
|
||||
end_offset: 100,
|
||||
},
|
||||
];
|
||||
|
||||
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||
for &checkpoint in &checkpoints {
|
||||
skip_index_builder.insert(checkpoint);
|
||||
}
|
||||
skip_index_builder.write(&mut output)?;
|
||||
|
||||
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||
assert_eq!(
|
||||
&skip_index.checkpoints().collect::<Vec<_>>()[..],
|
||||
&checkpoints[..]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn offset_test(doc: DocId) -> u64 {
|
||||
(doc as u64) * (doc as u64)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
|
||||
let mut schema_builder = SchemaBuilder::default();
|
||||
let text = schema_builder.add_text_field("text", STORED | STRING);
|
||||
let body = schema_builder.add_text_field("body", STORED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
|
||||
.take(1_000)
|
||||
.collect();
|
||||
for _ in 0..20 {
|
||||
index_writer.add_document(doc!(body=>long_text.clone()));
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text=>"testb"));
|
||||
for _ in 0..10 {
|
||||
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
|
||||
}
|
||||
index_writer.commit()?;
|
||||
index_writer.delete_term(Term::from_field_text(text, "testb"));
|
||||
index_writer.commit()?;
|
||||
let segment_ids = index.searchable_segment_ids()?;
|
||||
block_on(index_writer.merge(&segment_ids))?;
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
assert_eq!(searcher.num_docs(), 30);
|
||||
for i in 0..searcher.num_docs() as u32 {
|
||||
let _doc = searcher.doc(DocAddress(0u32, i))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skip_index_long() -> io::Result<()> {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let checkpoints: Vec<Checkpoint> = (0..1000)
|
||||
.map(|i| Checkpoint {
|
||||
start_doc: i,
|
||||
end_doc: i + 1,
|
||||
start_offset: offset_test(i),
|
||||
end_offset: offset_test(i + 1),
|
||||
})
|
||||
.collect();
|
||||
let mut skip_index_builder = SkipIndexBuilder::new();
|
||||
for checkpoint in &checkpoints {
|
||||
skip_index_builder.insert(*checkpoint);
|
||||
}
|
||||
skip_index_builder.write(&mut output)?;
|
||||
assert_eq!(output.len(), 4035);
|
||||
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
|
||||
.checkpoints()
|
||||
.collect();
|
||||
assert_eq!(&resulting_checkpoints, &checkpoints);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
|
||||
let mut prev = 0u64;
|
||||
for val in vals.iter_mut() {
|
||||
let new_val = *val + prev;
|
||||
prev = new_val;
|
||||
*val = new_val;
|
||||
}
|
||||
vals
|
||||
}
|
||||
|
||||
// Generates a sequence of n valid checkpoints, with n < max_len.
|
||||
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
|
||||
(1..max_len)
|
||||
.prop_flat_map(move |len: usize| {
|
||||
(
|
||||
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
|
||||
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
|
||||
)
|
||||
.prop_map(|(docs, offsets)| {
|
||||
(0..docs.len() - 1)
|
||||
.map(move |i| Checkpoint {
|
||||
start_doc: docs[i] as DocId,
|
||||
end_doc: docs[i + 1] as DocId,
|
||||
start_offset: offsets[i],
|
||||
end_offset: offsets[i + 1],
|
||||
})
|
||||
.collect::<Vec<Checkpoint>>()
|
||||
})
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn seek_manual<I: Iterator<Item = Checkpoint>>(
|
||||
checkpoints: I,
|
||||
target: DocId,
|
||||
) -> Option<Checkpoint> {
|
||||
checkpoints
|
||||
.into_iter()
|
||||
.filter(|checkpoint| checkpoint.end_doc > target)
|
||||
.next()
|
||||
}
|
||||
|
||||
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
|
||||
if let Some(last_checkpoint) = checkpoints.last() {
|
||||
for doc in 0u32..last_checkpoint.end_doc {
|
||||
let expected = seek_manual(skip_index.checkpoints(), doc);
|
||||
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
|
||||
}
|
||||
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(20))]
|
||||
#[test]
|
||||
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
|
||||
let mut skip_index_builder = SkipIndexBuilder::new();
|
||||
for checkpoint in checkpoints.iter().cloned() {
|
||||
skip_index_builder.insert(checkpoint);
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
skip_index_builder.write(&mut buffer).unwrap();
|
||||
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
|
||||
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
|
||||
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
|
||||
test_skip_index_aux(skip_index, &checkpoints[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
110
src/store/index/skip_index.rs
Normal file
110
src/store/index/skip_index.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
|
||||
pub struct LayerCursor<'a> {
|
||||
remaining: &'a [u8],
|
||||
block: CheckpointBlock,
|
||||
cursor: usize,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for LayerCursor<'a> {
|
||||
type Item = Checkpoint;
|
||||
|
||||
fn next(&mut self) -> Option<Checkpoint> {
|
||||
if self.cursor == self.block.len() {
|
||||
if self.remaining.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
|
||||
if block_mut.deserialize(remaining_mut).is_err() {
|
||||
return None;
|
||||
}
|
||||
self.cursor = 0;
|
||||
}
|
||||
let res = Some(self.block.get(self.cursor));
|
||||
self.cursor += 1;
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
struct Layer {
|
||||
data: OwnedBytes,
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.cursor_at_offset(0u64)
|
||||
}
|
||||
|
||||
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
let data = &self.data.as_slice();
|
||||
LayerCursor {
|
||||
remaining: &data[start_offset as usize..],
|
||||
block: CheckpointBlock::default(),
|
||||
cursor: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
|
||||
self.cursor_at_offset(offset)
|
||||
.find(|checkpoint| checkpoint.end_doc > target)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipIndex {
|
||||
layers: Vec<Layer>,
|
||||
}
|
||||
|
||||
impl SkipIndex {
|
||||
pub fn open(mut data: OwnedBytes) -> SkipIndex {
|
||||
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|el| el.0)
|
||||
.collect();
|
||||
let mut start_offset = 0;
|
||||
let mut layers = Vec::new();
|
||||
for end_offset in offsets {
|
||||
let layer = Layer {
|
||||
data: data.slice(start_offset as usize, end_offset as usize),
|
||||
};
|
||||
layers.push(layer);
|
||||
start_offset = end_offset;
|
||||
}
|
||||
SkipIndex { layers }
|
||||
}
|
||||
|
||||
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.layers
|
||||
.last()
|
||||
.into_iter()
|
||||
.flat_map(|layer| layer.cursor())
|
||||
}
|
||||
|
||||
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
|
||||
let first_layer_len = self
|
||||
.layers
|
||||
.first()
|
||||
.map(|layer| layer.data.len() as u64)
|
||||
.unwrap_or(0u64);
|
||||
let mut cur_checkpoint = Checkpoint {
|
||||
start_doc: 0u32,
|
||||
end_doc: 1u32,
|
||||
start_offset: 0u64,
|
||||
end_offset: first_layer_len,
|
||||
};
|
||||
for layer in &self.layers {
|
||||
if let Some(checkpoint) =
|
||||
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
|
||||
{
|
||||
cur_checkpoint = checkpoint;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Some(cur_checkpoint)
|
||||
}
|
||||
}
|
||||
117
src/store/index/skip_index_builder.rs
Normal file
117
src/store/index/skip_index_builder.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::store::index::block::CheckpointBlock;
|
||||
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
|
||||
|
||||
struct LayerBuilder {
|
||||
buffer: Vec<u8>,
|
||||
pub block: CheckpointBlock,
|
||||
}
|
||||
|
||||
impl LayerBuilder {
|
||||
fn finish(self) -> Vec<u8> {
|
||||
self.buffer
|
||||
}
|
||||
|
||||
fn new() -> LayerBuilder {
|
||||
LayerBuilder {
|
||||
buffer: Vec::new(),
|
||||
block: CheckpointBlock::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the block, and return a checkpoint representing
|
||||
/// the entire block.
|
||||
///
|
||||
/// If the block was empty to begin with, simply return None.
|
||||
fn flush_block(&mut self) -> Option<Checkpoint> {
|
||||
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
|
||||
let start_offset = self.buffer.len() as u64;
|
||||
self.block.serialize(&mut self.buffer);
|
||||
let end_offset = self.buffer.len() as u64;
|
||||
self.block.clear();
|
||||
Some(Checkpoint {
|
||||
start_doc,
|
||||
end_doc,
|
||||
start_offset,
|
||||
end_offset,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, checkpoint: Checkpoint) {
|
||||
self.block.push(checkpoint);
|
||||
}
|
||||
|
||||
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
|
||||
self.push(checkpoint);
|
||||
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
|
||||
if emit_skip_info {
|
||||
self.flush_block()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipIndexBuilder {
|
||||
layers: Vec<LayerBuilder>,
|
||||
}
|
||||
|
||||
impl SkipIndexBuilder {
|
||||
pub fn new() -> SkipIndexBuilder {
|
||||
SkipIndexBuilder { layers: Vec::new() }
|
||||
}
|
||||
|
||||
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
|
||||
if layer_id == self.layers.len() {
|
||||
let layer_builder = LayerBuilder::new();
|
||||
self.layers.push(layer_builder);
|
||||
}
|
||||
&mut self.layers[layer_id]
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, checkpoint: Checkpoint) {
|
||||
let mut skip_pointer = Some(checkpoint);
|
||||
for layer_id in 0.. {
|
||||
if let Some(checkpoint) = skip_pointer {
|
||||
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
|
||||
let mut last_pointer = None;
|
||||
for skip_layer in self.layers.iter_mut() {
|
||||
if let Some(checkpoint) = last_pointer {
|
||||
skip_layer.push(checkpoint);
|
||||
}
|
||||
last_pointer = skip_layer.flush_block();
|
||||
}
|
||||
let layer_buffers: Vec<Vec<u8>> = self
|
||||
.layers
|
||||
.into_iter()
|
||||
.rev()
|
||||
.map(|layer| layer.finish())
|
||||
.collect();
|
||||
|
||||
let mut layer_offset = 0;
|
||||
let mut layer_sizes = Vec::new();
|
||||
for layer_buffer in &layer_buffers {
|
||||
layer_offset += layer_buffer.len() as u64;
|
||||
layer_sizes.push(VInt(layer_offset));
|
||||
}
|
||||
layer_sizes.serialize(output)?;
|
||||
for layer_buffer in layer_buffers {
|
||||
output.write_all(&layer_buffer[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -33,8 +33,8 @@ and should rely on either
|
||||
|
||||
!*/
|
||||
|
||||
mod index;
|
||||
mod reader;
|
||||
mod skiplist;
|
||||
mod writer;
|
||||
pub use self::reader::StoreReader;
|
||||
pub use self::writer::StoreWriter;
|
||||
|
||||
@@ -1,69 +1,91 @@
|
||||
use super::decompress;
|
||||
use super::skiplist::SkipList;
|
||||
use super::index::SkipIndex;
|
||||
use crate::common::VInt;
|
||||
use crate::common::{BinarySerializable, HasLen};
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::schema::Document;
|
||||
use crate::space_usage::StoreSpaceUsage;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use std::cell::RefCell;
|
||||
use lru::LruCache;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
const LRU_CACHE_CAPACITY: usize = 100;
|
||||
|
||||
type Block = Arc<Vec<u8>>;
|
||||
|
||||
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
|
||||
|
||||
/// Reads document off tantivy's [`Store`](./index.html)
|
||||
#[derive(Clone)]
|
||||
pub struct StoreReader {
|
||||
data: FileSlice,
|
||||
offset_index_file: OwnedBytes,
|
||||
current_block_offset: RefCell<usize>,
|
||||
current_block: RefCell<Vec<u8>>,
|
||||
max_doc: DocId,
|
||||
cache: BlockCache,
|
||||
cache_hits: Arc<AtomicUsize>,
|
||||
cache_misses: Arc<AtomicUsize>,
|
||||
skip_index: Arc<SkipIndex>,
|
||||
space_usage: StoreSpaceUsage,
|
||||
}
|
||||
|
||||
impl StoreReader {
|
||||
/// Opens a store reader
|
||||
// TODO rename open
|
||||
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
|
||||
let (data_file, offset_index_file, max_doc) = split_file(store_file)?;
|
||||
let (data_file, offset_index_file) = split_file(store_file)?;
|
||||
let index_data = offset_index_file.read_bytes()?;
|
||||
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
|
||||
let skip_index = SkipIndex::open(index_data);
|
||||
Ok(StoreReader {
|
||||
data: data_file,
|
||||
offset_index_file: offset_index_file.read_bytes()?,
|
||||
current_block_offset: RefCell::new(usize::max_value()),
|
||||
current_block: RefCell::new(Vec::new()),
|
||||
max_doc,
|
||||
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
skip_index: Arc::new(skip_index),
|
||||
space_usage,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn block_index(&self) -> SkipList<'_, u64> {
|
||||
SkipList::from(self.offset_index_file.as_slice())
|
||||
pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||
self.skip_index.checkpoints()
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
self.block_index()
|
||||
.seek(u64::from(doc_id) + 1)
|
||||
.map(|(doc, offset)| (doc as DocId, offset))
|
||||
.unwrap_or((0u32, 0u64))
|
||||
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
|
||||
self.skip_index.seek(doc_id)
|
||||
}
|
||||
|
||||
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
|
||||
self.data.read_bytes()
|
||||
}
|
||||
|
||||
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> {
|
||||
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4);
|
||||
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?;
|
||||
block_body.slice_to(block_len as usize).read_bytes()
|
||||
fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
|
||||
self.data
|
||||
.slice(
|
||||
checkpoint.start_offset as usize,
|
||||
checkpoint.end_offset as usize,
|
||||
)
|
||||
.read_bytes()
|
||||
}
|
||||
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
||||
if block_offset != *self.current_block_offset.borrow() {
|
||||
let mut current_block_mut = self.current_block.borrow_mut();
|
||||
current_block_mut.clear();
|
||||
let compressed_block = self.compressed_block(block_offset)?;
|
||||
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
|
||||
*self.current_block_offset.borrow_mut() = block_offset;
|
||||
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
|
||||
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
|
||||
self.cache_hits.fetch_add(1, Ordering::SeqCst);
|
||||
return Ok(block.clone());
|
||||
}
|
||||
Ok(())
|
||||
|
||||
self.cache_misses.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let compressed_block = self.compressed_block(checkpoint)?;
|
||||
let mut decompressed_block = vec![];
|
||||
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
|
||||
|
||||
let block = Arc::new(decompressed_block);
|
||||
self.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.put(checkpoint.start_offset, block.clone());
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Reads a given document.
|
||||
@@ -74,14 +96,15 @@ impl StoreReader {
|
||||
/// It should not be called to score documents
|
||||
/// for instance.
|
||||
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
|
||||
let (first_doc_id, block_offset) = self.block_offset(doc_id);
|
||||
self.read_block(block_offset as usize)?;
|
||||
let current_block_mut = self.current_block.borrow_mut();
|
||||
let mut cursor = ¤t_block_mut[..];
|
||||
for _ in first_doc_id..doc_id {
|
||||
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
|
||||
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
|
||||
})?;
|
||||
let mut cursor = &self.read_block(&checkpoint)?[..];
|
||||
for _ in checkpoint.start_doc..doc_id {
|
||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
|
||||
cursor = &cursor[doc_length..];
|
||||
}
|
||||
|
||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
|
||||
cursor = &cursor[..doc_length];
|
||||
Ok(Document::deserialize(&mut cursor)?)
|
||||
@@ -89,21 +112,93 @@ impl StoreReader {
|
||||
|
||||
/// Summarize total space usage of this store reader.
|
||||
pub fn space_usage(&self) -> StoreSpaceUsage {
|
||||
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len())
|
||||
self.space_usage.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
|
||||
let data_len = data.len();
|
||||
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
|
||||
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
|
||||
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
|
||||
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
|
||||
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
|
||||
let mut serialized_offset_buf = serialized_offset.as_slice();
|
||||
let offset = u64::deserialize(&mut serialized_offset_buf)?;
|
||||
let offset = offset as usize;
|
||||
let max_doc = u32::deserialize(&mut serialized_offset_buf)?;
|
||||
Ok((
|
||||
data.slice(0, offset),
|
||||
data.slice(offset, footer_offset),
|
||||
max_doc,
|
||||
))
|
||||
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
|
||||
Ok(data.split(offset))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Field;
|
||||
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
|
||||
use std::path::Path;
|
||||
|
||||
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
|
||||
doc.get_first(*field).and_then(|f| f.text())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_store_lru_cache() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = Path::new("store");
|
||||
let writer = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(writer, 500);
|
||||
let title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 0);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
|
||||
|
||||
let doc = store.get(0)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 1);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(0)
|
||||
);
|
||||
|
||||
let doc = store.get(499)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 2);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
|
||||
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(0)
|
||||
);
|
||||
|
||||
let doc = store.get(0)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 2);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(18806)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
mod skiplist;
|
||||
mod skiplist_builder;
|
||||
|
||||
pub use self::skiplist::SkipList;
|
||||
pub use self::skiplist_builder::SkipListBuilder;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{SkipList, SkipListBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_skiplist() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
|
||||
skip_list_builder.insert(2, &3).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next(), Some((2, 3)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist2() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist3() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (3, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (5, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (7, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (9, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist4() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(5);
|
||||
assert_eq!(skip_list.next().unwrap(), (5, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (7, ()));
|
||||
assert_eq!(skip_list.next().unwrap(), (9, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist5() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(6, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(6);
|
||||
assert_eq!(skip_list.next().unwrap(), (6, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist6() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
|
||||
skip_list_builder.insert(2, &()).unwrap();
|
||||
skip_list_builder.insert(3, &()).unwrap();
|
||||
skip_list_builder.insert(5, &()).unwrap();
|
||||
skip_list_builder.insert(7, &()).unwrap();
|
||||
skip_list_builder.insert(9, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (2, ()));
|
||||
skip_list.seek(10);
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist7() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..1000 {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.insert(1004, &()).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
|
||||
assert_eq!(skip_list.next().unwrap(), (0, ()));
|
||||
skip_list.seek(431);
|
||||
assert_eq!(skip_list.next().unwrap(), (431, ()));
|
||||
skip_list.seek(1003);
|
||||
assert_eq!(skip_list.next().unwrap(), (1004, ()));
|
||||
assert_eq!(skip_list.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist8() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
|
||||
skip_list_builder.insert(2, &3).unwrap();
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 11);
|
||||
assert_eq!(output[0], 1u8 + 128u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist9() {
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
|
||||
for i in 0..4 * 4 * 4 {
|
||||
skip_list_builder.insert(i, &i).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 774);
|
||||
assert_eq!(output[0], 4u8 + 128u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist10() {
|
||||
// checking that void gets serialized to nothing.
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..((4 * 4 * 4) - 1) {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 230);
|
||||
assert_eq!(output[0], 128u8 + 3u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skiplist11() {
|
||||
// checking that void gets serialized to nothing.
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
|
||||
for i in 0..(4 * 4) {
|
||||
skip_list_builder.insert(i, &()).unwrap();
|
||||
}
|
||||
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
|
||||
assert_eq!(output.len(), 65);
|
||||
assert_eq!(output[0], 128u8 + 3u8);
|
||||
}
|
||||
}
|
||||
@@ -1,133 +0,0 @@
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use std::cmp::max;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
static EMPTY: [u8; 0] = [];
|
||||
|
||||
struct Layer<'a, T> {
|
||||
data: &'a [u8],
|
||||
cursor: &'a [u8],
|
||||
next_id: Option<u64>,
|
||||
_phantom_: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
|
||||
type Item = (u64, T);
|
||||
|
||||
fn next(&mut self) -> Option<(u64, T)> {
|
||||
if let Some(cur_id) = self.next_id {
|
||||
let cur_val = T::deserialize(&mut self.cursor).unwrap();
|
||||
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
|
||||
Some((cur_id, cur_val))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
|
||||
fn from(data: &'a [u8]) -> Layer<'a, T> {
|
||||
let mut cursor = data;
|
||||
let next_id = VInt::deserialize_u64(&mut cursor).ok();
|
||||
Layer {
|
||||
data,
|
||||
cursor,
|
||||
next_id,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Layer<'a, T> {
|
||||
fn empty() -> Layer<'a, T> {
|
||||
Layer {
|
||||
data: &EMPTY,
|
||||
cursor: &EMPTY,
|
||||
next_id: None,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
fn seek_offset(&mut self, offset: usize) {
|
||||
self.cursor = &self.data[offset..];
|
||||
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
|
||||
}
|
||||
|
||||
// Returns the last element (key, val)
|
||||
// such that (key < doc_id)
|
||||
//
|
||||
// If there is no such element anymore,
|
||||
// returns None.
|
||||
//
|
||||
// If the element exists, it will be returned
|
||||
// at the next call to `.next()`.
|
||||
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
|
||||
let mut result: Option<(u64, T)> = None;
|
||||
loop {
|
||||
if let Some(next_id) = self.next_id {
|
||||
if next_id < key {
|
||||
if let Some(v) = self.next() {
|
||||
result = Some(v);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipList<'a, T: BinarySerializable> {
|
||||
data_layer: Layer<'a, T>,
|
||||
skip_layers: Vec<Layer<'a, u64>>,
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
|
||||
type Item = (u64, T);
|
||||
|
||||
fn next(&mut self) -> Option<(u64, T)> {
|
||||
self.data_layer.next()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> SkipList<'a, T> {
|
||||
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
|
||||
let mut next_layer_skip: Option<(u64, u64)> = None;
|
||||
for skip_layer in &mut self.skip_layers {
|
||||
if let Some((_, offset)) = next_layer_skip {
|
||||
skip_layer.seek_offset(offset as usize);
|
||||
}
|
||||
next_layer_skip = skip_layer.seek(key);
|
||||
}
|
||||
if let Some((_, offset)) = next_layer_skip {
|
||||
self.data_layer.seek_offset(offset as usize);
|
||||
}
|
||||
self.data_layer.seek(key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
|
||||
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
|
||||
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|el| el.0)
|
||||
.collect();
|
||||
let num_layers = offsets.len();
|
||||
let layers_data: &[u8] = data;
|
||||
let data_layer: Layer<'a, T> = if num_layers == 0 {
|
||||
Layer::empty()
|
||||
} else {
|
||||
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
|
||||
Layer::from(first_layer_data)
|
||||
};
|
||||
let skip_layers = (0..max(1, num_layers) - 1)
|
||||
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
|
||||
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
|
||||
.collect();
|
||||
SkipList {
|
||||
skip_layers,
|
||||
data_layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
use crate::common::{is_power_of_2, BinarySerializable, VInt};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
struct LayerBuilder<T: BinarySerializable> {
|
||||
period_mask: usize,
|
||||
buffer: Vec<u8>,
|
||||
len: usize,
|
||||
_phantom_: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: BinarySerializable> LayerBuilder<T> {
|
||||
fn written_size(&self) -> usize {
|
||||
self.buffer.len()
|
||||
}
|
||||
|
||||
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
|
||||
output.write_all(&self.buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn with_period(period: usize) -> LayerBuilder<T> {
|
||||
assert!(is_power_of_2(period), "The period has to be a power of 2.");
|
||||
LayerBuilder {
|
||||
period_mask: (period - 1),
|
||||
buffer: Vec::new(),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
|
||||
self.len += 1;
|
||||
let offset = self.written_size() as u64;
|
||||
VInt(key).serialize_into_vec(&mut self.buffer);
|
||||
value.serialize(&mut self.buffer)?;
|
||||
let emit_skip_info = (self.period_mask & self.len) == 0;
|
||||
if emit_skip_info {
|
||||
Ok(Some((key, offset)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SkipListBuilder<T: BinarySerializable> {
|
||||
period: usize,
|
||||
data_layer: LayerBuilder<T>,
|
||||
skip_layers: Vec<LayerBuilder<u64>>,
|
||||
}
|
||||
|
||||
impl<T: BinarySerializable> SkipListBuilder<T> {
|
||||
pub fn new(period: usize) -> SkipListBuilder<T> {
|
||||
SkipListBuilder {
|
||||
period,
|
||||
data_layer: LayerBuilder::with_period(period),
|
||||
skip_layers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
|
||||
if layer_id == self.skip_layers.len() {
|
||||
let layer_builder = LayerBuilder::with_period(self.period);
|
||||
self.skip_layers.push(layer_builder);
|
||||
}
|
||||
&mut self.skip_layers[layer_id]
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
|
||||
let mut skip_pointer = self.data_layer.insert(key, dest)?;
|
||||
for layer_id in 0.. {
|
||||
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
|
||||
skip_pointer = self
|
||||
.get_skip_layer(layer_id)
|
||||
.insert(skip_doc_id, &skip_offset)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
|
||||
let mut size: u64 = self.data_layer.buffer.len() as u64;
|
||||
let mut layer_sizes = vec![VInt(size)];
|
||||
for layer in self.skip_layers.iter().rev() {
|
||||
size += layer.buffer.len() as u64;
|
||||
layer_sizes.push(VInt(size));
|
||||
}
|
||||
layer_sizes.serialize(output)?;
|
||||
self.data_layer.write(output)?;
|
||||
for layer in self.skip_layers.iter().rev() {
|
||||
layer.write(output)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
50
src/store/tests_store.rs
Normal file
50
src/store/tests_store.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
use std::path::Path;
|
||||
|
||||
use crate::HasLen;
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
|
||||
use crate::fastfield::DeleteBitSet;
|
||||
|
||||
use super::{StoreReader, StoreWriter};
|
||||
|
||||
#[test]
|
||||
fn test_toto2() -> crate::Result<()> {
|
||||
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
|
||||
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
|
||||
assert!(directory.validate_checksum(path)?);
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
let documents = store.documents();
|
||||
// for doc in documents {
|
||||
// println!("{:?}", doc);
|
||||
// }
|
||||
let doc= store.get(15_086)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_toto() -> crate::Result<()> {
|
||||
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
|
||||
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
|
||||
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
let doc= store.get(53)?;
|
||||
println!("{:?}", doc);
|
||||
// let documents = store.documents();
|
||||
// let ram_directory = RAMDirectory::create();
|
||||
// let path = Path::new("store");
|
||||
|
||||
// let store_wrt = ram_directory.open_write(path)?;
|
||||
// let mut store_writer = StoreWriter::new(store_wrt);
|
||||
// for doc in &documents {
|
||||
// store_writer.store(doc)?;
|
||||
// }
|
||||
// store_writer.close()?;
|
||||
// let store_data = ram_directory.open_read(path)?;
|
||||
// let new_store = StoreReader::open(store_data)?;
|
||||
// for doc in 0..59 {
|
||||
// println!("{}", doc);
|
||||
// let doc = new_store.get(doc)?;
|
||||
// println!("{:?}", doc);
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,15 +1,16 @@
|
||||
use super::compress;
|
||||
use super::skiplist::SkipListBuilder;
|
||||
use super::index::SkipIndexBuilder;
|
||||
use super::StoreReader;
|
||||
use crate::common::CountingWriter;
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use crate::directory::TerminatingWrite;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Document;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::DocId;
|
||||
use std::io::{self, Write};
|
||||
|
||||
const BLOCK_SIZE: usize = 16_384;
|
||||
const BLOCK_SIZE: usize = 30;
|
||||
|
||||
/// Write tantivy's [`Store`](./index.html)
|
||||
///
|
||||
@@ -21,7 +22,8 @@ const BLOCK_SIZE: usize = 16_384;
|
||||
///
|
||||
pub struct StoreWriter {
|
||||
doc: DocId,
|
||||
offset_index_writer: SkipListBuilder<u64>,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
writer: CountingWriter<WritePtr>,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
current_block: Vec<u8>,
|
||||
@@ -35,7 +37,8 @@ impl StoreWriter {
|
||||
pub fn new(writer: WritePtr) -> StoreWriter {
|
||||
StoreWriter {
|
||||
doc: 0,
|
||||
offset_index_writer: SkipListBuilder::new(4),
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
writer: CountingWriter::wrap(writer),
|
||||
intermediary_buffer: Vec::new(),
|
||||
current_block: Vec::new(),
|
||||
@@ -68,11 +71,10 @@ impl StoreWriter {
|
||||
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
|
||||
if !self.current_block.is_empty() {
|
||||
self.write_and_compress_block()?;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
|
||||
}
|
||||
let doc_offset = self.doc;
|
||||
let start_offset = self.writer.written_bytes() as u64;
|
||||
assert_eq!(self.first_doc_in_block, self.doc);
|
||||
let doc_shift = self.doc;
|
||||
let start_shift = self.writer.written_bytes() as u64;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer
|
||||
@@ -80,21 +82,36 @@ impl StoreWriter {
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for (next_doc_id, block_addr) in store_reader.block_index() {
|
||||
self.doc = doc_offset + next_doc_id as u32;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(start_offset + block_addr))?;
|
||||
for mut checkpoint in store_reader.block_checkpoints() {
|
||||
checkpoint.start_doc += doc_shift;
|
||||
checkpoint.end_doc += doc_shift;
|
||||
checkpoint.start_offset += start_shift;
|
||||
checkpoint.end_offset += start_shift;
|
||||
self.register_checkpoint(checkpoint);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||
self.offset_index_writer.insert(checkpoint);
|
||||
self.first_doc_in_block = checkpoint.end_doc;
|
||||
self.doc = checkpoint.end_doc;
|
||||
}
|
||||
|
||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||
assert!(self.doc > 0);
|
||||
self.intermediary_buffer.clear();
|
||||
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
|
||||
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?;
|
||||
let start_offset = self.writer.written_bytes();
|
||||
self.writer.write_all(&self.intermediary_buffer)?;
|
||||
self.offset_index_writer
|
||||
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
|
||||
let end_offset = self.writer.written_bytes();
|
||||
let end_doc = self.doc;
|
||||
self.register_checkpoint(Checkpoint {
|
||||
start_doc: self.first_doc_in_block,
|
||||
end_doc,
|
||||
start_offset,
|
||||
end_offset,
|
||||
});
|
||||
self.current_block.clear();
|
||||
Ok(())
|
||||
}
|
||||
@@ -110,7 +127,6 @@ impl StoreWriter {
|
||||
let header_offset: u64 = self.writer.written_bytes() as u64;
|
||||
self.offset_index_writer.write(&mut self.writer)?;
|
||||
header_offset.serialize(&mut self.writer)?;
|
||||
self.doc.serialize(&mut self.writer)?;
|
||||
self.writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
27
src/termdict/fst_termdict/mod.rs
Normal file
27
src/termdict/fst_termdict/mod.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
/*!
|
||||
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
|
||||
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
|
||||
about the term.
|
||||
|
||||
Internally, the term dictionary relies on the `fst` crate to store
|
||||
a sorted mapping that associate each term to its rank in the lexicographical order.
|
||||
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
|
||||
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
|
||||
|
||||
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
|
||||
lexicographical order matches the natural order of integers.
|
||||
|
||||
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
|
||||
and then treated as a `u64`.
|
||||
|
||||
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
|
||||
as `u64`.
|
||||
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::io;
|
||||
|
||||
use super::TermDictionary;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -59,14 +61,14 @@ where
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
pub fn into_stream(self) -> TermStreamer<'a, A> {
|
||||
TermStreamer {
|
||||
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
|
||||
Ok(TermStreamer {
|
||||
fst_map: self.fst_map,
|
||||
stream: self.stream_builder.into_stream(),
|
||||
term_ord: 0u64,
|
||||
current_key: Vec::with_capacity(100),
|
||||
current_value: TermInfo::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,22 +55,32 @@ impl TermInfoBlockMeta {
|
||||
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
|
||||
}
|
||||
|
||||
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
|
||||
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
|
||||
// is encoded without bitpacking.
|
||||
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
|
||||
assert!(inner_offset < BLOCK_LEN - 1);
|
||||
let num_bits = self.num_bits() as usize;
|
||||
let mut cursor = num_bits * inner_offset;
|
||||
|
||||
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
|
||||
cursor += self.doc_freq_nbits as usize;
|
||||
let posting_start_addr = num_bits * inner_offset;
|
||||
// the stop offset is the start offset of the next term info.
|
||||
let posting_stop_addr = posting_start_addr + num_bits;
|
||||
let doc_freq_addr = posting_start_addr + self.postings_offset_nbits as usize;
|
||||
let positions_idx_addr = doc_freq_addr + self.doc_freq_nbits as usize;
|
||||
|
||||
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
|
||||
cursor += self.postings_offset_nbits as usize;
|
||||
|
||||
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
|
||||
let postings_start_offset = self.ref_term_info.postings_start_offset
|
||||
+ extract_bits(data, posting_start_addr, self.postings_offset_nbits);
|
||||
let postings_stop_offset = self.ref_term_info.postings_start_offset
|
||||
+ extract_bits(data, posting_stop_addr, self.postings_offset_nbits);
|
||||
let doc_freq = extract_bits(data, doc_freq_addr, self.doc_freq_nbits) as u32;
|
||||
let positions_idx = self.ref_term_info.positions_idx
|
||||
+ extract_bits(data, positions_idx_addr, self.positions_idx_nbits);
|
||||
|
||||
TermInfo {
|
||||
doc_freq,
|
||||
postings_offset: postings_offset + self.ref_term_info.postings_offset,
|
||||
positions_idx: positions_idx + self.ref_term_info.positions_idx,
|
||||
postings_start_offset,
|
||||
postings_stop_offset,
|
||||
positions_idx,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -152,16 +162,17 @@ fn bitpack_serialize<W: Write>(
|
||||
term_info_block_meta: &TermInfoBlockMeta,
|
||||
term_info: &TermInfo,
|
||||
) -> io::Result<()> {
|
||||
bit_packer.write(
|
||||
term_info.postings_start_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
write,
|
||||
)?;
|
||||
bit_packer.write(
|
||||
u64::from(term_info.doc_freq),
|
||||
term_info_block_meta.doc_freq_nbits,
|
||||
write,
|
||||
)?;
|
||||
bit_packer.write(
|
||||
term_info.postings_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
write,
|
||||
)?;
|
||||
|
||||
bit_packer.write(
|
||||
term_info.positions_idx,
|
||||
term_info_block_meta.positions_idx_nbits,
|
||||
@@ -181,23 +192,27 @@ impl TermInfoStoreWriter {
|
||||
}
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
if self.term_infos.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut bit_packer = BitPacker::new();
|
||||
let ref_term_info = self.term_infos[0].clone();
|
||||
|
||||
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
|
||||
last_term_info
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
let postings_stop_offset =
|
||||
last_term_info.postings_stop_offset - ref_term_info.postings_start_offset;
|
||||
for term_info in &mut self.term_infos[1..] {
|
||||
term_info.postings_offset -= ref_term_info.postings_offset;
|
||||
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
|
||||
term_info.positions_idx -= ref_term_info.positions_idx;
|
||||
}
|
||||
|
||||
let mut max_doc_freq: u32 = 0u32;
|
||||
let mut max_postings_offset: u64 = 0u64;
|
||||
let mut max_positions_idx: u64 = 0u64;
|
||||
let max_postings_offset: u64 = postings_stop_offset;
|
||||
let max_positions_idx: u64 = last_term_info.positions_idx;
|
||||
|
||||
for term_info in &self.term_infos[1..] {
|
||||
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
|
||||
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
|
||||
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
|
||||
}
|
||||
|
||||
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
|
||||
@@ -222,6 +237,12 @@ impl TermInfoStoreWriter {
|
||||
)?;
|
||||
}
|
||||
|
||||
bit_packer.write(
|
||||
postings_stop_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
&mut self.buffer_term_infos,
|
||||
)?;
|
||||
|
||||
// Block need end up at the end of a byte.
|
||||
bit_packer.flush(&mut self.buffer_term_infos)?;
|
||||
self.term_infos.clear();
|
||||
@@ -230,6 +251,7 @@ impl TermInfoStoreWriter {
|
||||
}
|
||||
|
||||
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
|
||||
assert!(term_info.postings_stop_offset >= term_info.postings_start_offset);
|
||||
self.num_terms += 1u64;
|
||||
self.term_infos.push(term_info.clone());
|
||||
if self.term_infos.len() >= BLOCK_LEN {
|
||||
@@ -289,10 +311,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_term_info_block_meta_serialization() {
|
||||
let term_info_block_meta = TermInfoBlockMeta {
|
||||
offset: 2009,
|
||||
offset: 2009u64,
|
||||
ref_term_info: TermInfo {
|
||||
doc_freq: 512,
|
||||
postings_offset: 51,
|
||||
postings_start_offset: 51,
|
||||
postings_stop_offset: 57u64,
|
||||
positions_idx: 3584,
|
||||
},
|
||||
doc_freq_nbits: 10,
|
||||
@@ -310,10 +333,12 @@ mod tests {
|
||||
fn test_pack() -> crate::Result<()> {
|
||||
let mut store_writer = TermInfoStoreWriter::new();
|
||||
let mut term_infos = vec![];
|
||||
let offset = |i| (i * 13 + i * i) as u64;
|
||||
for i in 0..1000 {
|
||||
let term_info = TermInfo {
|
||||
doc_freq: i as u32,
|
||||
postings_offset: (i / 10) as u64,
|
||||
postings_start_offset: offset(i),
|
||||
postings_stop_offset: offset(i + 1),
|
||||
positions_idx: (i * 7) as u64,
|
||||
};
|
||||
store_writer.write_term_info(&term_info)?;
|
||||
@@ -323,7 +348,12 @@ mod tests {
|
||||
store_writer.serialize(&mut buffer)?;
|
||||
let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?;
|
||||
for i in 0..1000 {
|
||||
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
|
||||
assert_eq!(
|
||||
term_info_store.get(i as u64),
|
||||
term_infos[i],
|
||||
"term info {}",
|
||||
i
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -80,7 +80,6 @@ where
|
||||
.serialize(&mut counting_writer)?;
|
||||
let footer_size = counting_writer.written_bytes();
|
||||
(footer_size as u64).serialize(&mut counting_writer)?;
|
||||
counting_writer.flush()?;
|
||||
}
|
||||
Ok(file)
|
||||
}
|
||||
@@ -139,8 +138,8 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Returns the ordinal associated to a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
|
||||
self.fst_index.get(key)
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
Ok(self.fst_index.get(key))
|
||||
}
|
||||
|
||||
/// Returns the term associated to a given term ordinal.
|
||||
@@ -152,7 +151,7 @@ impl TermDictionary {
|
||||
///
|
||||
/// Regardless of whether the term is found or not,
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
bytes.clear();
|
||||
let fst = self.fst_index.as_fst();
|
||||
let mut node = fst.root();
|
||||
@@ -167,10 +166,10 @@ impl TermDictionary {
|
||||
let new_node_addr = transition.addr;
|
||||
node = fst.node(new_node_addr);
|
||||
} else {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
@@ -179,9 +178,10 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
|
||||
self.term_ord(key)
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord))
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
|
||||
Ok(self
|
||||
.term_ord(key)?
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord)))
|
||||
}
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
@@ -191,7 +191,7 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> TermStreamer<'_> {
|
||||
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
@@ -60,12 +60,10 @@ impl<'a> TermMerger<'a> {
|
||||
|
||||
pub(crate) fn matching_segments<'b: 'a>(
|
||||
&'b self,
|
||||
) -> Box<dyn 'b + Iterator<Item = (usize, TermOrdinal)>> {
|
||||
Box::new(
|
||||
self.current_streamers
|
||||
.iter()
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())),
|
||||
)
|
||||
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
|
||||
self.current_streamers
|
||||
.iter()
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
|
||||
}
|
||||
|
||||
fn advance_segments(&mut self) {
|
||||
|
||||
@@ -20,435 +20,37 @@ as `u64`.
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
mod fst_termdict;
|
||||
use fst_termdict as termdict;
|
||||
|
||||
mod merger;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Position of the term in the sorted list of terms.
|
||||
pub type TermOrdinal = u64;
|
||||
|
||||
mod merger;
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
/// The term dictionary contains all of the terms in
|
||||
/// `tantivy index` in a sorted manner.
|
||||
pub type TermDictionary = self::termdict::TermDictionary;
|
||||
|
||||
pub use self::merger::TermMerger;
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Inserting must be done in the order of the `keys`.
|
||||
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use crate::core::Index;
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
/// Given a list of sorted term streams,
|
||||
/// returns an iterator over sorted unique terms.
|
||||
///
|
||||
/// The item yield is actually a pair with
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub type TermMerger<'a> = self::merger::TermMerger<'a>;
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(val: u64) -> TermInfo {
|
||||
TermInfo {
|
||||
doc_freq: val as u32,
|
||||
positions_idx: val * 2u64,
|
||||
postings_offset: val * 3u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream();
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_iterator() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b c d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field => "e f"));
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let searcher = index.reader()?.searcher();
|
||||
|
||||
let field_searcher = searcher.field(text_field)?;
|
||||
let mut term_it = field_searcher.terms();
|
||||
let mut term_string = String::new();
|
||||
while term_it.advance() {
|
||||
//let term = Term::from_bytes(term_it.key());
|
||||
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
|
||||
}
|
||||
assert_eq!(&*term_string, "abcdef");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes()),
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(2))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream();
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream();
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
if backwards {
|
||||
res.reverse();
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
|
||||
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream();
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
|
||||
|
||||
431
src/termdict/tests.rs
Normal file
431
src/termdict/tests.rs
Normal file
@@ -0,0 +1,431 @@
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
|
||||
use crate::postings::TermInfo;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_stop_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().unwrap().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream()?;
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream()?;
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes())?,
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream()?;
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream()?;
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
TermDictionary::open(file)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
|
||||
let term_dictionary = stream_range_test_dict()?;
|
||||
let value_list = |mut streamer: TermStreamer<'_>| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream()?;
|
||||
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
|
||||
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
|
||||
let term_dictionary = stream_range_test_dict()?;
|
||||
let value_list_backward = |mut streamer: TermStreamer<'_>| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
res.reverse();
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32]
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ord_to_term() -> crate::Result<()> {
|
||||
let termdict = stream_range_test_dict()?;
|
||||
let mut bytes = vec![];
|
||||
for b in 0u8..10u8 {
|
||||
termdict.ord_to_term(b as u64, &mut bytes)?;
|
||||
assert_eq!(&bytes, &[b]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_term_ord() -> crate::Result<()> {
|
||||
let termdict = stream_range_test_dict()?;
|
||||
let mut stream = termdict.stream()?;
|
||||
for b in 0u8..10u8 {
|
||||
assert!(stream.advance(), true);
|
||||
assert_eq!(stream.term_ord(), b as u64);
|
||||
assert_eq!(stream.key(), &[b]);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream()?;
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
@@ -18,7 +18,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
|
||||
.unwrap()
|
||||
.terminate()
|
||||
.unwrap();
|
||||
assert!(managed_directory.exists(test_path));
|
||||
assert!(managed_directory.exists(test_path).unwrap());
|
||||
// triggering gc and setting the delete operation to fail.
|
||||
//
|
||||
// We are checking that the gc operation is not removing the
|
||||
@@ -29,12 +29,12 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
|
||||
// lock file.
|
||||
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
|
||||
assert!(managed_directory.garbage_collect(Default::default).is_ok());
|
||||
assert!(managed_directory.exists(test_path));
|
||||
assert!(managed_directory.exists(test_path).unwrap());
|
||||
|
||||
// running the gc a second time should remove the file.
|
||||
assert!(managed_directory.garbage_collect(Default::default).is_ok());
|
||||
assert!(
|
||||
!managed_directory.exists(test_path),
|
||||
!managed_directory.exists(test_path).unwrap(),
|
||||
"The file should have been deleted"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user