mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 18:12:55 +00:00
Compare commits
30 Commits
issue/938
...
debugging-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08f7706973 | ||
|
|
bf6e6e8a7c | ||
|
|
203b0256a3 | ||
|
|
caf2a38b7e | ||
|
|
96f24b078e | ||
|
|
332b50a4eb | ||
|
|
8ca0954b3b | ||
|
|
36343e2de8 | ||
|
|
2f14a892ca | ||
|
|
9c3cabce40 | ||
|
|
f8d71c2b10 | ||
|
|
394dfb24f1 | ||
|
|
b0549a229d | ||
|
|
670b6eaff6 | ||
|
|
a4f33d3823 | ||
|
|
c7841e3da5 | ||
|
|
e7b4a12bba | ||
|
|
0aaa929d6e | ||
|
|
1112797c18 | ||
|
|
920481e1c1 | ||
|
|
55f7b84966 | ||
|
|
09ab4df1fe | ||
|
|
0c2cf81b37 | ||
|
|
d864430bda | ||
|
|
de60540e06 | ||
|
|
c3e311e6b8 | ||
|
|
ac704f2f22 | ||
|
|
be626083a0 | ||
|
|
b68fcca1e0 | ||
|
|
af6dfa1856 |
@@ -9,6 +9,10 @@ Tantivy 0.14.0
|
|||||||
- Bugfix in `Query::explain`
|
- Bugfix in `Query::explain`
|
||||||
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
|
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
|
||||||
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
|
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
|
||||||
|
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
|
||||||
|
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
|
||||||
|
|
||||||
|
This version breaks compatibility and requires users to reindex everything.
|
||||||
|
|
||||||
Tantivy 0.13.2
|
Tantivy 0.13.2
|
||||||
===================
|
===================
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ snap = "1"
|
|||||||
tempfile = {version="3", optional=true}
|
tempfile = {version="3", optional=true}
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
serde = {version="1", features=["derive"]}
|
serde = {version="1", features=["derive"]}
|
||||||
serde_cbor = "0.11"
|
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
fs2={version="0.4", optional=true}
|
fs2={version="0.4", optional=true}
|
||||||
@@ -48,16 +47,18 @@ murmurhash32 = "0.2"
|
|||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
smallvec = "1"
|
smallvec = "1"
|
||||||
rayon = "1"
|
rayon = "1"
|
||||||
|
env_logger = "0.8"
|
||||||
lru = "0.6"
|
lru = "0.6"
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
winapi = "0.3"
|
winapi = "0.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "0.7"
|
rand = "0.8"
|
||||||
maplit = "1"
|
maplit = "1"
|
||||||
matches = "0.1.8"
|
matches = "0.1.8"
|
||||||
proptest = "0.10"
|
proptest = "0.10"
|
||||||
|
criterion = "0.3"
|
||||||
|
|
||||||
[dev-dependencies.fail]
|
[dev-dependencies.fail]
|
||||||
version = "0.4"
|
version = "0.4"
|
||||||
@@ -98,3 +99,7 @@ travis-ci = { repository = "tantivy-search/tantivy" }
|
|||||||
name = "failpoints"
|
name = "failpoints"
|
||||||
path = "tests/failpoints/mod.rs"
|
path = "tests/failpoints/mod.rs"
|
||||||
required-features = ["fail/failpoints"]
|
required-features = ["fail/failpoints"]
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "analyzer"
|
||||||
|
harness = false
|
||||||
|
|||||||
3774
benches/alice.txt
Normal file
3774
benches/alice.txt
Normal file
File diff suppressed because it is too large
Load Diff
22
benches/analyzer.rs
Normal file
22
benches/analyzer.rs
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
use criterion::{criterion_group, criterion_main, Criterion};
|
||||||
|
use tantivy::tokenizer::TokenizerManager;
|
||||||
|
|
||||||
|
const ALICE_TXT: &'static str = include_str!("alice.txt");
|
||||||
|
|
||||||
|
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||||
|
let tokenizer_manager = TokenizerManager::default();
|
||||||
|
let tokenizer = tokenizer_manager.get("default").unwrap();
|
||||||
|
c.bench_function("default-tokenize-alice", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
let mut word_count = 0;
|
||||||
|
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
|
||||||
|
while token_stream.advance() {
|
||||||
|
word_count += 1;
|
||||||
|
}
|
||||||
|
assert_eq!(word_count, 30_731);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(benches, criterion_benchmark);
|
||||||
|
criterion_main!(benches);
|
||||||
@@ -9,8 +9,10 @@
|
|||||||
|
|
||||||
// ---
|
// ---
|
||||||
// Importing tantivy...
|
// Importing tantivy...
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use crate::collector::{Collector, SegmentCollector};
|
use crate::collector::{Collector, SegmentCollector};
|
||||||
use crate::fastfield::FastFieldReader;
|
use crate::fastfield::{FastFieldReader, FastValue};
|
||||||
use crate::schema::Field;
|
use crate::schema::Field;
|
||||||
use crate::{Score, SegmentReader, TantivyError};
|
use crate::{Score, SegmentReader, TantivyError};
|
||||||
|
|
||||||
@@ -41,78 +43,104 @@ use crate::{Score, SegmentReader, TantivyError};
|
|||||||
///
|
///
|
||||||
/// let query_parser = QueryParser::for_index(&index, vec![title]);
|
/// let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||||
/// let query = query_parser.parse_query("diary").unwrap();
|
/// let query = query_parser.parse_query("diary").unwrap();
|
||||||
/// let no_filter_collector = FilterCollector::new(price, &|value| value > 20_120u64, TopDocs::with_limit(2));
|
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
|
||||||
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
|
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
|
||||||
///
|
///
|
||||||
/// assert_eq!(top_docs.len(), 1);
|
/// assert_eq!(top_docs.len(), 1);
|
||||||
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
||||||
///
|
///
|
||||||
/// let filter_all_collector = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
||||||
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
||||||
///
|
///
|
||||||
/// assert_eq!(filtered_top_docs.len(), 0);
|
/// assert_eq!(filtered_top_docs.len(), 0);
|
||||||
/// ```
|
/// ```
|
||||||
pub struct FilterCollector<TCollector, TPredicate>
|
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
|
||||||
where
|
where
|
||||||
TPredicate: 'static,
|
TPredicate: 'static,
|
||||||
{
|
{
|
||||||
field: Field,
|
field: Field,
|
||||||
collector: TCollector,
|
collector: TCollector,
|
||||||
predicate: &'static TPredicate,
|
predicate: &'static TPredicate,
|
||||||
|
t_predicate_value: PhantomData<TPredicateValue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TCollector, TPredicate> FilterCollector<TCollector, TPredicate>
|
impl<TCollector, TPredicate, TPredicateValue: FastValue>
|
||||||
|
FilterCollector<TCollector, TPredicate, TPredicateValue>
|
||||||
where
|
where
|
||||||
TCollector: Collector + Send + Sync,
|
TCollector: Collector + Send + Sync,
|
||||||
TPredicate: Fn(u64) -> bool + Send + Sync,
|
TPredicate: Fn(TPredicateValue) -> bool + Send + Sync,
|
||||||
{
|
{
|
||||||
/// Create a new FilterCollector.
|
/// Create a new FilterCollector.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
field: Field,
|
field: Field,
|
||||||
predicate: &'static TPredicate,
|
predicate: &'static TPredicate,
|
||||||
collector: TCollector,
|
collector: TCollector,
|
||||||
) -> FilterCollector<TCollector, TPredicate> {
|
) -> FilterCollector<TCollector, TPredicate, TPredicateValue> {
|
||||||
FilterCollector {
|
FilterCollector {
|
||||||
field,
|
field,
|
||||||
predicate,
|
predicate,
|
||||||
collector,
|
collector,
|
||||||
|
t_predicate_value: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TCollector, TPredicate> Collector for FilterCollector<TCollector, TPredicate>
|
impl<TCollector, TPredicate, TPredicateValue: FastValue> Collector
|
||||||
|
for FilterCollector<TCollector, TPredicate, TPredicateValue>
|
||||||
where
|
where
|
||||||
TCollector: Collector + Send + Sync,
|
TCollector: Collector + Send + Sync,
|
||||||
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
|
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
|
||||||
|
TPredicateValue: 'static + FastValue,
|
||||||
{
|
{
|
||||||
// That's the type of our result.
|
// That's the type of our result.
|
||||||
// Our standard deviation will be a float.
|
// Our standard deviation will be a float.
|
||||||
type Fruit = TCollector::Fruit;
|
type Fruit = TCollector::Fruit;
|
||||||
|
|
||||||
type Child = FilterSegmentCollector<TCollector::Child, TPredicate>;
|
type Child = FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>;
|
||||||
|
|
||||||
fn for_segment(
|
fn for_segment(
|
||||||
&self,
|
&self,
|
||||||
segment_local_id: u32,
|
segment_local_id: u32,
|
||||||
segment_reader: &SegmentReader,
|
segment_reader: &SegmentReader,
|
||||||
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate>> {
|
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>> {
|
||||||
|
let schema = segment_reader.schema();
|
||||||
|
let field_entry = schema.get_field_entry(self.field);
|
||||||
|
if !field_entry.is_fast() {
|
||||||
|
return Err(TantivyError::SchemaError(format!(
|
||||||
|
"Field {:?} is not a fast field.",
|
||||||
|
field_entry.name()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let requested_type = TPredicateValue::to_type();
|
||||||
|
let field_schema_type = field_entry.field_type().value_type();
|
||||||
|
if requested_type != field_schema_type {
|
||||||
|
return Err(TantivyError::SchemaError(format!(
|
||||||
|
"Field {:?} is of type {:?}!={:?}",
|
||||||
|
field_entry.name(),
|
||||||
|
requested_type,
|
||||||
|
field_schema_type
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
let fast_field_reader = segment_reader
|
let fast_field_reader = segment_reader
|
||||||
.fast_fields()
|
.fast_fields()
|
||||||
.u64(self.field)
|
.typed_fast_field_reader(self.field)
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
let field_name = segment_reader.schema().get_field_name(self.field);
|
|
||||||
TantivyError::SchemaError(format!(
|
TantivyError::SchemaError(format!(
|
||||||
"Field {:?} is not a u64 fast field.",
|
"{:?} is not declared as a fast field in the schema.",
|
||||||
field_name
|
self.field
|
||||||
))
|
))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let segment_collector = self
|
let segment_collector = self
|
||||||
.collector
|
.collector
|
||||||
.for_segment(segment_local_id, segment_reader)?;
|
.for_segment(segment_local_id, segment_reader)?;
|
||||||
|
|
||||||
Ok(FilterSegmentCollector {
|
Ok(FilterSegmentCollector {
|
||||||
fast_field_reader,
|
fast_field_reader,
|
||||||
segment_collector,
|
segment_collector,
|
||||||
predicate: self.predicate,
|
predicate: self.predicate,
|
||||||
|
t_predicate_value: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,20 +156,23 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate>
|
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
|
||||||
where
|
where
|
||||||
TPredicate: 'static,
|
TPredicate: 'static,
|
||||||
|
TPredicateValue: 'static + FastValue,
|
||||||
{
|
{
|
||||||
fast_field_reader: FastFieldReader<u64>,
|
fast_field_reader: FastFieldReader<TPredicateValue>,
|
||||||
segment_collector: TSegmentCollector,
|
segment_collector: TSegmentCollector,
|
||||||
predicate: &'static TPredicate,
|
predicate: &'static TPredicate,
|
||||||
|
t_predicate_value: PhantomData<TPredicateValue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSegmentCollector, TPredicate> SegmentCollector
|
impl<TSegmentCollector, TPredicate, TPredicateValue> SegmentCollector
|
||||||
for FilterSegmentCollector<TSegmentCollector, TPredicate>
|
for FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
|
||||||
where
|
where
|
||||||
TSegmentCollector: SegmentCollector,
|
TSegmentCollector: SegmentCollector,
|
||||||
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
|
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
|
||||||
|
TPredicateValue: 'static + FastValue,
|
||||||
{
|
{
|
||||||
type Fruit = TSegmentCollector::Fruit;
|
type Fruit = TSegmentCollector::Fruit;
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,13 @@ use crate::DocId;
|
|||||||
use crate::Score;
|
use crate::Score;
|
||||||
use crate::SegmentLocalId;
|
use crate::SegmentLocalId;
|
||||||
|
|
||||||
|
use crate::collector::{FilterCollector, TopDocs};
|
||||||
|
use crate::query::QueryParser;
|
||||||
|
use crate::schema::{Schema, FAST, TEXT};
|
||||||
|
use crate::DateTime;
|
||||||
|
use crate::{doc, Index};
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
|
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
|
||||||
compute_score: true,
|
compute_score: true,
|
||||||
};
|
};
|
||||||
@@ -16,6 +23,54 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
|
|||||||
compute_score: true,
|
compute_score: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_filter_collector() {
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
let title = schema_builder.add_text_field("title", TEXT);
|
||||||
|
let price = schema_builder.add_u64_field("price", FAST);
|
||||||
|
let date = schema_builder.add_date_field("date", FAST);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
let index = Index::create_in_ram(schema);
|
||||||
|
|
||||||
|
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
|
||||||
|
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
|
||||||
|
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
|
||||||
|
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
|
||||||
|
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
|
||||||
|
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
|
||||||
|
assert!(index_writer.commit().is_ok());
|
||||||
|
|
||||||
|
let reader = index.reader().unwrap();
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
|
||||||
|
let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||||
|
let query = query_parser.parse_query("diary").unwrap();
|
||||||
|
let filter_some_collector = FilterCollector::new(
|
||||||
|
price,
|
||||||
|
&|value: u64| value > 20_120u64,
|
||||||
|
TopDocs::with_limit(2),
|
||||||
|
);
|
||||||
|
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(top_docs.len(), 1);
|
||||||
|
assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
||||||
|
|
||||||
|
let filter_all_collector: FilterCollector<_, _, u64> =
|
||||||
|
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
||||||
|
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(filtered_top_docs.len(), 0);
|
||||||
|
|
||||||
|
fn date_filter(value: DateTime) -> bool {
|
||||||
|
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
|
||||||
|
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(filtered_date_docs.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
/// Stores all of the doc ids.
|
/// Stores all of the doc ids.
|
||||||
/// This collector is only used for tests.
|
/// This collector is only used for tests.
|
||||||
/// It is unusable in pr
|
/// It is unusable in pr
|
||||||
|
|||||||
@@ -115,11 +115,16 @@ pub fn u64_to_i64(val: u64) -> i64 {
|
|||||||
/// For simplicity, tantivy internally handles `f64` as `u64`.
|
/// For simplicity, tantivy internally handles `f64` as `u64`.
|
||||||
/// The mapping is defined by this function.
|
/// The mapping is defined by this function.
|
||||||
///
|
///
|
||||||
/// Maps `f64` to `u64` so that lexical order is preserved.
|
/// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
|
||||||
///
|
///
|
||||||
/// This is more suited than simply casting (`val as u64`)
|
/// This is more suited than simply casting (`val as u64`)
|
||||||
/// which would truncate the result
|
/// which would truncate the result
|
||||||
///
|
///
|
||||||
|
/// # Reference
|
||||||
|
///
|
||||||
|
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
|
||||||
|
/// explains the mapping in a clear manner.
|
||||||
|
///
|
||||||
/// # See also
|
/// # See also
|
||||||
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
|
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
@@ -148,6 +153,7 @@ pub(crate) mod test {
|
|||||||
pub use super::minmax;
|
pub use super::minmax;
|
||||||
pub use super::serialize::test::fixed_size_test;
|
pub use super::serialize::test::fixed_size_test;
|
||||||
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||||
|
use proptest::prelude::*;
|
||||||
use std::f64;
|
use std::f64;
|
||||||
|
|
||||||
fn test_i64_converter_helper(val: i64) {
|
fn test_i64_converter_helper(val: i64) {
|
||||||
@@ -158,6 +164,15 @@ pub(crate) mod test {
|
|||||||
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
|
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#[test]
|
||||||
|
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
|
||||||
|
let left_u64 = f64_to_u64(left);
|
||||||
|
let right_u64 = f64_to_u64(right);
|
||||||
|
assert_eq!(left_u64 < right_u64, left < right);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_i64_converter() {
|
fn test_i64_converter() {
|
||||||
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());
|
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());
|
||||||
|
|||||||
@@ -35,12 +35,18 @@ fn load_metas(
|
|||||||
inventory: &SegmentMetaInventory,
|
inventory: &SegmentMetaInventory,
|
||||||
) -> crate::Result<IndexMeta> {
|
) -> crate::Result<IndexMeta> {
|
||||||
let meta_data = directory.atomic_read(&META_FILEPATH)?;
|
let meta_data = directory.atomic_read(&META_FILEPATH)?;
|
||||||
let meta_string = String::from_utf8_lossy(&meta_data);
|
let meta_string = String::from_utf8(meta_data)
|
||||||
|
.map_err(|utf8_err| {
|
||||||
|
DataCorruption::new(
|
||||||
|
META_FILEPATH.to_path_buf(),
|
||||||
|
format!("Meta file is not valid utf-8. {:?}", utf8_err)
|
||||||
|
)
|
||||||
|
})?;
|
||||||
IndexMeta::deserialize(&meta_string, &inventory)
|
IndexMeta::deserialize(&meta_string, &inventory)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
DataCorruption::new(
|
DataCorruption::new(
|
||||||
META_FILEPATH.to_path_buf(),
|
META_FILEPATH.to_path_buf(),
|
||||||
format!("Meta file cannot be deserialized. {:?}.", e),
|
format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map_err(From::from)
|
.map_err(From::from)
|
||||||
|
|||||||
@@ -58,7 +58,8 @@ pub enum OpenWriteError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl OpenWriteError {
|
impl OpenWriteError {
|
||||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
/// Wraps an io error.
|
||||||
|
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||||
Self::IOError { io_error, filepath }
|
Self::IOError { io_error, filepath }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -143,7 +144,8 @@ pub enum OpenReadError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl OpenReadError {
|
impl OpenReadError {
|
||||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
/// Wraps an io error.
|
||||||
|
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||||
Self::IOError { io_error, filepath }
|
Self::IOError { io_error, filepath }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use crc32fast::Hasher;
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::BufRead;
|
use std::io::BufRead;
|
||||||
use std::path::PathBuf;
|
use std::path::Path;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@@ -13,15 +13,15 @@ pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 }
|
|||||||
|
|
||||||
// Watches a file and executes registered callbacks when the file is modified.
|
// Watches a file and executes registered callbacks when the file is modified.
|
||||||
pub struct FileWatcher {
|
pub struct FileWatcher {
|
||||||
path: Arc<PathBuf>,
|
path: Arc<Path>,
|
||||||
callbacks: Arc<WatchCallbackList>,
|
callbacks: Arc<WatchCallbackList>,
|
||||||
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
|
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileWatcher {
|
impl FileWatcher {
|
||||||
pub fn new(path: &PathBuf) -> FileWatcher {
|
pub fn new(path: &Path) -> FileWatcher {
|
||||||
FileWatcher {
|
FileWatcher {
|
||||||
path: Arc::new(path.clone()),
|
path: Arc::from(path),
|
||||||
callbacks: Default::default(),
|
callbacks: Default::default(),
|
||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
}
|
}
|
||||||
@@ -63,7 +63,7 @@ impl FileWatcher {
|
|||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
|
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
|
||||||
let reader = match fs::File::open(path) {
|
let reader = match fs::File::open(path) {
|
||||||
Ok(f) => io::BufReader::new(f),
|
Ok(f) => io::BufReader::new(f),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -115,6 +115,18 @@ impl Footer {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
VersionedFooter::V3 {
|
||||||
|
crc32: _crc,
|
||||||
|
store_compression,
|
||||||
|
} => {
|
||||||
|
if &library_version.store_compression != store_compression {
|
||||||
|
return Err(Incompatibility::CompressionMismatch {
|
||||||
|
library_compression_format: library_version.store_compression.to_string(),
|
||||||
|
index_compression_format: store_compression.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
|
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
|
||||||
library_version: library_version.clone(),
|
library_version: library_version.clone(),
|
||||||
index_version: self.version.clone(),
|
index_version: self.version.clone(),
|
||||||
@@ -136,24 +148,31 @@ pub enum VersionedFooter {
|
|||||||
crc32: CrcHashU32,
|
crc32: CrcHashU32,
|
||||||
store_compression: String,
|
store_compression: String,
|
||||||
},
|
},
|
||||||
|
// Block wand max termfred on 1 byte
|
||||||
|
V3 {
|
||||||
|
crc32: CrcHashU32,
|
||||||
|
store_compression: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinarySerializable for VersionedFooter {
|
impl BinarySerializable for VersionedFooter {
|
||||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
match self {
|
match self {
|
||||||
VersionedFooter::V2 {
|
VersionedFooter::V3 {
|
||||||
crc32,
|
crc32,
|
||||||
store_compression: compression,
|
store_compression: compression,
|
||||||
} => {
|
} => {
|
||||||
// Serializes a valid `VersionedFooter` or panics if the version is unknown
|
// Serializes a valid `VersionedFooter` or panics if the version is unknown
|
||||||
// [ version | crc_hash | compression_mode ]
|
// [ version | crc_hash | compression_mode ]
|
||||||
// [ 0..4 | 4..8 | variable ]
|
// [ 0..4 | 4..8 | variable ]
|
||||||
BinarySerializable::serialize(&2u32, &mut buf)?;
|
BinarySerializable::serialize(&3u32, &mut buf)?;
|
||||||
BinarySerializable::serialize(crc32, &mut buf)?;
|
BinarySerializable::serialize(crc32, &mut buf)?;
|
||||||
BinarySerializable::serialize(compression, &mut buf)?;
|
BinarySerializable::serialize(compression, &mut buf)?;
|
||||||
}
|
}
|
||||||
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => {
|
VersionedFooter::V2 { .. }
|
||||||
|
| VersionedFooter::V1 { .. }
|
||||||
|
| VersionedFooter::UnknownVersion => {
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::InvalidInput,
|
||||||
"Cannot serialize an unknown versioned footer ",
|
"Cannot serialize an unknown versioned footer ",
|
||||||
@@ -182,7 +201,7 @@ impl BinarySerializable for VersionedFooter {
|
|||||||
reader.read_exact(&mut buf[..])?;
|
reader.read_exact(&mut buf[..])?;
|
||||||
let mut cursor = &buf[..];
|
let mut cursor = &buf[..];
|
||||||
let version = u32::deserialize(&mut cursor)?;
|
let version = u32::deserialize(&mut cursor)?;
|
||||||
if version != 1 && version != 2 {
|
if version > 3 {
|
||||||
return Ok(VersionedFooter::UnknownVersion);
|
return Ok(VersionedFooter::UnknownVersion);
|
||||||
}
|
}
|
||||||
let crc32 = u32::deserialize(&mut cursor)?;
|
let crc32 = u32::deserialize(&mut cursor)?;
|
||||||
@@ -192,12 +211,17 @@ impl BinarySerializable for VersionedFooter {
|
|||||||
crc32,
|
crc32,
|
||||||
store_compression,
|
store_compression,
|
||||||
}
|
}
|
||||||
} else {
|
} else if version == 2 {
|
||||||
assert_eq!(version, 2);
|
|
||||||
VersionedFooter::V2 {
|
VersionedFooter::V2 {
|
||||||
crc32,
|
crc32,
|
||||||
store_compression,
|
store_compression,
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
assert_eq!(version, 3);
|
||||||
|
VersionedFooter::V3 {
|
||||||
|
crc32,
|
||||||
|
store_compression,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -205,6 +229,7 @@ impl BinarySerializable for VersionedFooter {
|
|||||||
impl VersionedFooter {
|
impl VersionedFooter {
|
||||||
pub fn crc(&self) -> Option<CrcHashU32> {
|
pub fn crc(&self) -> Option<CrcHashU32> {
|
||||||
match self {
|
match self {
|
||||||
|
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
|
||||||
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
|
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
|
||||||
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
|
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
|
||||||
VersionedFooter::UnknownVersion { .. } => None,
|
VersionedFooter::UnknownVersion { .. } => None,
|
||||||
@@ -243,7 +268,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
|
|||||||
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
|
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
|
||||||
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
|
||||||
let crc32 = self.hasher.take().unwrap().finalize();
|
let crc32 = self.hasher.take().unwrap().finalize();
|
||||||
let footer = Footer::new(VersionedFooter::V2 {
|
let footer = Footer::new(VersionedFooter::V3 {
|
||||||
crc32,
|
crc32,
|
||||||
store_compression: crate::store::COMPRESSION.to_string(),
|
store_compression: crate::store::COMPRESSION.to_string(),
|
||||||
});
|
});
|
||||||
@@ -278,7 +303,7 @@ mod tests {
|
|||||||
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
|
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
footer.versioned_footer,
|
footer.versioned_footer,
|
||||||
VersionedFooter::V2 { store_compression, .. }
|
VersionedFooter::V3 { store_compression, .. }
|
||||||
if store_compression == crate::store::COMPRESSION
|
if store_compression == crate::store::COMPRESSION
|
||||||
));
|
));
|
||||||
assert_eq!(&footer.version, crate::version());
|
assert_eq!(&footer.version, crate::version());
|
||||||
@@ -288,7 +313,7 @@ mod tests {
|
|||||||
fn test_serialize_deserialize_footer() {
|
fn test_serialize_deserialize_footer() {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
let crc32 = 123456u32;
|
let crc32 = 123456u32;
|
||||||
let footer: Footer = Footer::new(VersionedFooter::V2 {
|
let footer: Footer = Footer::new(VersionedFooter::V3 {
|
||||||
crc32,
|
crc32,
|
||||||
store_compression: "lz4".to_string(),
|
store_compression: "lz4".to_string(),
|
||||||
});
|
});
|
||||||
@@ -300,7 +325,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn footer_length() {
|
fn footer_length() {
|
||||||
let crc32 = 1111111u32;
|
let crc32 = 1111111u32;
|
||||||
let versioned_footer = VersionedFooter::V2 {
|
let versioned_footer = VersionedFooter::V3 {
|
||||||
crc32,
|
crc32,
|
||||||
store_compression: "lz4".to_string(),
|
store_compression: "lz4".to_string(),
|
||||||
};
|
};
|
||||||
@@ -321,7 +346,7 @@ mod tests {
|
|||||||
// versionned footer length
|
// versionned footer length
|
||||||
12 | 128,
|
12 | 128,
|
||||||
// index format version
|
// index format version
|
||||||
2,
|
3,
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
@@ -340,7 +365,7 @@ mod tests {
|
|||||||
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
|
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
|
||||||
assert!(cursor.is_empty());
|
assert!(cursor.is_empty());
|
||||||
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
|
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
|
||||||
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 {
|
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
|
||||||
crc32: expected_crc,
|
crc32: expected_crc,
|
||||||
store_compression: "lz4".to_string(),
|
store_compression: "lz4".to_string(),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -51,6 +51,15 @@ impl<Item: FastValue> FastFieldReader<Item> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
|
||||||
|
FastFieldReader {
|
||||||
|
bit_unpacker: self.bit_unpacker,
|
||||||
|
min_value_u64: self.min_value_u64,
|
||||||
|
max_value_u64: self.max_value_u64,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Return the value associated to the given document.
|
/// Return the value associated to the given document.
|
||||||
///
|
///
|
||||||
/// This accessor should return as fast as possible.
|
/// This accessor should return as fast as possible.
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use crate::common::CompositeFile;
|
use crate::common::CompositeFile;
|
||||||
use crate::fastfield::BytesFastFieldReader;
|
|
||||||
use crate::fastfield::MultiValueIntFastFieldReader;
|
use crate::fastfield::MultiValueIntFastFieldReader;
|
||||||
|
use crate::fastfield::{BytesFastFieldReader, FastValue};
|
||||||
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
|
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
|
||||||
use crate::schema::{Cardinality, Field, FieldType, Schema};
|
use crate::schema::{Cardinality, Field, FieldType, Schema};
|
||||||
use crate::space_usage::PerFieldSpaceUsage;
|
use crate::space_usage::PerFieldSpaceUsage;
|
||||||
@@ -201,6 +201,14 @@ impl FastFieldReaders {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
|
||||||
|
&self,
|
||||||
|
field: Field,
|
||||||
|
) -> Option<FastFieldReader<TFastValue>> {
|
||||||
|
self.u64_lenient(field)
|
||||||
|
.map(|fast_field_reader| fast_field_reader.cast())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||||
///
|
///
|
||||||
/// If `field` is not a i64 fast field, this method returns `None`.
|
/// If `field` is not a i64 fast field, this method returns `None`.
|
||||||
|
|||||||
@@ -1,45 +1,94 @@
|
|||||||
use rand::thread_rng;
|
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use crate::schema::*;
|
|
||||||
use crate::Index;
|
use crate::Index;
|
||||||
use crate::Searcher;
|
use crate::Searcher;
|
||||||
|
use crate::{doc, schema::*};
|
||||||
|
use rand::thread_rng;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
|
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
|
||||||
assert!(searcher.segment_readers().len() < 20);
|
assert!(searcher.segment_readers().len() < 20);
|
||||||
assert_eq!(searcher.num_docs() as usize, vals.len());
|
assert_eq!(searcher.num_docs() as usize, vals.len());
|
||||||
|
for segment_reader in searcher.segment_readers() {
|
||||||
|
let store_reader = segment_reader.get_store_reader()?;
|
||||||
|
for doc_id in 0..segment_reader.max_doc() {
|
||||||
|
let _doc = store_reader.get(doc_id)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn test_indexing() {
|
fn test_functional_store() -> crate::Result<()> {
|
||||||
|
env_logger::init();
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
|
||||||
|
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
|
||||||
|
let index = Index::create_in_ram(schema);
|
||||||
|
let reader = index.reader()?;
|
||||||
|
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
|
||||||
|
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
|
||||||
|
|
||||||
|
let mut doc_set: Vec<u64> = Vec::new();
|
||||||
|
|
||||||
|
let mut doc_id = 0u64;
|
||||||
|
for iteration in 0.. {
|
||||||
|
let num_docs: usize = rng.gen_range(0..4);
|
||||||
|
if doc_set.len() >= 1 {
|
||||||
|
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
|
||||||
|
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
|
||||||
|
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
|
||||||
|
}
|
||||||
|
for _ in 0..num_docs {
|
||||||
|
doc_set.push(doc_id);
|
||||||
|
index_writer.add_document(doc!(id_field=>doc_id));
|
||||||
|
doc_id += 1;
|
||||||
|
}
|
||||||
|
index_writer.commit()?;
|
||||||
|
reader.reload()?;
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
println!("#{} - {}", iteration, searcher.segment_readers().len());
|
||||||
|
check_index_content(&searcher, &doc_set)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn test_functional_indexing() -> crate::Result<()> {
|
||||||
let mut schema_builder = Schema::builder();
|
let mut schema_builder = Schema::builder();
|
||||||
|
|
||||||
let id_field = schema_builder.add_u64_field("id", INDEXED);
|
let id_field = schema_builder.add_u64_field("id", INDEXED);
|
||||||
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
|
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
|
||||||
let schema = schema_builder.build();
|
let schema = schema_builder.build();
|
||||||
|
|
||||||
let index = Index::create_from_tempdir(schema).unwrap();
|
let index = Index::create_from_tempdir(schema)?;
|
||||||
let reader = index.reader().unwrap();
|
let reader = index.reader()?;
|
||||||
|
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
|
|
||||||
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
|
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
|
||||||
|
|
||||||
let mut committed_docs: HashSet<u64> = HashSet::new();
|
let mut committed_docs: HashSet<u64> = HashSet::new();
|
||||||
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
|
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
|
||||||
|
|
||||||
for _ in 0..200 {
|
for _ in 0..200 {
|
||||||
let random_val = rng.gen_range(0, 20);
|
let random_val = rng.gen_range(0..20);
|
||||||
if random_val == 0 {
|
if random_val == 0 {
|
||||||
index_writer.commit().expect("Commit failed");
|
index_writer.commit()?;
|
||||||
committed_docs.extend(&uncommitted_docs);
|
committed_docs.extend(&uncommitted_docs);
|
||||||
uncommitted_docs.clear();
|
uncommitted_docs.clear();
|
||||||
reader.reload().unwrap();
|
reader.reload()?;
|
||||||
let searcher = reader.searcher();
|
let searcher = reader.searcher();
|
||||||
// check that everything is correct.
|
// check that everything is correct.
|
||||||
check_index_content(&searcher, &committed_docs);
|
check_index_content(
|
||||||
|
&searcher,
|
||||||
|
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
|
||||||
|
)?;
|
||||||
} else {
|
} else {
|
||||||
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
|
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
|
||||||
let doc_id_term = Term::from_field_u64(id_field, random_val);
|
let doc_id_term = Term::from_field_u64(id_field, random_val);
|
||||||
@@ -55,4 +104,5 @@ fn test_indexing() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
|
|||||||
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
|
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
|
||||||
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
|
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
|
||||||
|
|
||||||
/// `LogMergePolicy` tries tries to merge segments that have a similar number of
|
/// `LogMergePolicy` tries to merge segments that have a similar number of
|
||||||
/// documents.
|
/// documents.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct LogMergePolicy {
|
pub struct LogMergePolicy {
|
||||||
|
|||||||
@@ -512,12 +512,10 @@ impl IndexMerger {
|
|||||||
.collect::<crate::Result<Vec<_>>>()?;
|
.collect::<crate::Result<Vec<_>>>()?;
|
||||||
|
|
||||||
let mut field_term_streams = Vec::new();
|
let mut field_term_streams = Vec::new();
|
||||||
|
|
||||||
for field_reader in &field_readers {
|
for field_reader in &field_readers {
|
||||||
let terms = field_reader.terms();
|
let terms = field_reader.terms();
|
||||||
|
field_term_streams.push(terms.stream()?);
|
||||||
max_term_ords.push(terms.num_terms() as u64);
|
max_term_ords.push(terms.num_terms() as u64);
|
||||||
let term_stream = terms.stream()?;
|
|
||||||
field_term_streams.push(term_stream);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut term_ord_mapping_opt = if *field_type == FieldType::HierarchicalFacet {
|
let mut term_ord_mapping_opt = if *field_type == FieldType::HierarchicalFacet {
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Index format version.
|
/// Index format version.
|
||||||
const INDEX_FORMAT_VERSION: u32 = 2;
|
const INDEX_FORMAT_VERSION: u32 = 3;
|
||||||
|
|
||||||
/// Structure version for the index.
|
/// Structure version for the index.
|
||||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -1,32 +1,46 @@
|
|||||||
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable};
|
use std::convert::TryInto;
|
||||||
|
|
||||||
use crate::directory::OwnedBytes;
|
use crate::directory::OwnedBytes;
|
||||||
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
|
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
|
||||||
use crate::query::BM25Weight;
|
use crate::query::BM25Weight;
|
||||||
use crate::schema::IndexRecordOption;
|
use crate::schema::IndexRecordOption;
|
||||||
use crate::{DocId, Score, TERMINATED};
|
use crate::{DocId, Score, TERMINATED};
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
|
||||||
|
max_tf.min(u8::MAX as u32) as u8
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
|
||||||
|
if max_tf_code == u8::MAX {
|
||||||
|
u32::MAX
|
||||||
|
} else {
|
||||||
|
max_tf_code as u32
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn read_u32(data: &[u8]) -> u32 {
|
||||||
|
u32::from_le_bytes(data[..4].try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn write_u32(val: u32, buf: &mut Vec<u8>) {
|
||||||
|
buf.extend_from_slice(&val.to_le_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
pub struct SkipSerializer {
|
pub struct SkipSerializer {
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
prev_doc: DocId,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SkipSerializer {
|
impl SkipSerializer {
|
||||||
pub fn new() -> SkipSerializer {
|
pub fn new() -> SkipSerializer {
|
||||||
SkipSerializer {
|
SkipSerializer { buffer: Vec::new() }
|
||||||
buffer: Vec::new(),
|
|
||||||
prev_doc: 0u32,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
|
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
|
||||||
assert!(
|
write_u32(last_doc, &mut self.buffer);
|
||||||
last_doc > self.prev_doc,
|
|
||||||
"write_doc(...) called with non-increasing doc ids. \
|
|
||||||
Did you forget to call clear maybe?"
|
|
||||||
);
|
|
||||||
let delta_doc = last_doc - self.prev_doc;
|
|
||||||
self.prev_doc = last_doc;
|
|
||||||
delta_doc.serialize(&mut self.buffer).unwrap();
|
|
||||||
self.buffer.push(doc_num_bits);
|
self.buffer.push(doc_num_bits);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -35,16 +49,13 @@ impl SkipSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
|
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
|
||||||
tf_sum
|
write_u32(tf_sum, &mut self.buffer);
|
||||||
.serialize(&mut self.buffer)
|
|
||||||
.expect("Should never fail");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
|
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
|
||||||
self.buffer.push(fieldnorm_id);
|
let block_wand_tf = encode_block_wand_max_tf(term_freq);
|
||||||
let mut buf = [0u8; 8];
|
self.buffer
|
||||||
let bytes = serialize_vint_u32(term_freq, &mut buf);
|
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
|
||||||
self.buffer.extend_from_slice(bytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data(&self) -> &[u8] {
|
pub fn data(&self) -> &[u8] {
|
||||||
@@ -52,7 +63,6 @@ impl SkipSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.prev_doc = 0u32;
|
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -159,18 +169,13 @@ impl SkipReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn read_block_info(&mut self) {
|
fn read_block_info(&mut self) {
|
||||||
let doc_delta = {
|
let bytes = self.owned_read.as_slice();
|
||||||
let bytes = self.owned_read.as_slice();
|
let advance_len: usize;
|
||||||
let mut buf = [0; 4];
|
self.last_doc_in_block = read_u32(bytes);
|
||||||
buf.copy_from_slice(&bytes[..4]);
|
let doc_num_bits = bytes[4];
|
||||||
u32::from_le_bytes(buf)
|
|
||||||
};
|
|
||||||
self.last_doc_in_block += doc_delta as DocId;
|
|
||||||
let doc_num_bits = self.owned_read.as_slice()[4];
|
|
||||||
|
|
||||||
match self.skip_info {
|
match self.skip_info {
|
||||||
IndexRecordOption::Basic => {
|
IndexRecordOption::Basic => {
|
||||||
self.owned_read.advance(5);
|
advance_len = 5;
|
||||||
self.block_info = BlockInfo::BitPacked {
|
self.block_info = BlockInfo::BitPacked {
|
||||||
doc_num_bits,
|
doc_num_bits,
|
||||||
tf_num_bits: 0,
|
tf_num_bits: 0,
|
||||||
@@ -180,11 +185,10 @@ impl SkipReader {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
IndexRecordOption::WithFreqs => {
|
IndexRecordOption::WithFreqs => {
|
||||||
let bytes = self.owned_read.as_slice();
|
|
||||||
let tf_num_bits = bytes[5];
|
let tf_num_bits = bytes[5];
|
||||||
let block_wand_fieldnorm_id = bytes[6];
|
let block_wand_fieldnorm_id = bytes[6];
|
||||||
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]);
|
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
|
||||||
self.owned_read.advance(7 + num_bytes);
|
advance_len = 8;
|
||||||
self.block_info = BlockInfo::BitPacked {
|
self.block_info = BlockInfo::BitPacked {
|
||||||
doc_num_bits,
|
doc_num_bits,
|
||||||
tf_num_bits,
|
tf_num_bits,
|
||||||
@@ -194,16 +198,11 @@ impl SkipReader {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
IndexRecordOption::WithFreqsAndPositions => {
|
IndexRecordOption::WithFreqsAndPositions => {
|
||||||
let bytes = self.owned_read.as_slice();
|
|
||||||
let tf_num_bits = bytes[5];
|
let tf_num_bits = bytes[5];
|
||||||
let tf_sum = {
|
let tf_sum = read_u32(&bytes[6..10]);
|
||||||
let mut buf = [0; 4];
|
|
||||||
buf.copy_from_slice(&bytes[6..10]);
|
|
||||||
u32::from_le_bytes(buf)
|
|
||||||
};
|
|
||||||
let block_wand_fieldnorm_id = bytes[10];
|
let block_wand_fieldnorm_id = bytes[10];
|
||||||
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]);
|
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
|
||||||
self.owned_read.advance(11 + num_bytes);
|
advance_len = 12;
|
||||||
self.block_info = BlockInfo::BitPacked {
|
self.block_info = BlockInfo::BitPacked {
|
||||||
doc_num_bits,
|
doc_num_bits,
|
||||||
tf_num_bits,
|
tf_num_bits,
|
||||||
@@ -213,6 +212,7 @@ impl SkipReader {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.owned_read.advance(advance_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn block_info(&self) -> BlockInfo {
|
pub fn block_info(&self) -> BlockInfo {
|
||||||
@@ -274,6 +274,24 @@ mod tests {
|
|||||||
use crate::directory::OwnedBytes;
|
use crate::directory::OwnedBytes;
|
||||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_encode_block_wand_max_tf() {
|
||||||
|
for tf in 0..255 {
|
||||||
|
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
|
||||||
|
}
|
||||||
|
for &tf in &[255, 256, 1_000_000, u32::MAX] {
|
||||||
|
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_decode_block_wand_max_tf() {
|
||||||
|
for tf in 0..255 {
|
||||||
|
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
|
||||||
|
}
|
||||||
|
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_skip_with_freq() {
|
fn test_skip_with_freq() {
|
||||||
let buf = {
|
let buf = {
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
use rayon::iter::IntoParallelRefIterator;
|
|
||||||
|
|
||||||
use crate::core::SegmentReader;
|
use crate::core::SegmentReader;
|
||||||
use crate::postings::FreqReadingOption;
|
use crate::postings::FreqReadingOption;
|
||||||
use crate::query::explanation::does_not_match;
|
use crate::query::explanation::does_not_match;
|
||||||
@@ -24,7 +22,7 @@ enum SpecializedScorer {
|
|||||||
|
|
||||||
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> SpecializedScorer
|
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> SpecializedScorer
|
||||||
where
|
where
|
||||||
TScoreCombiner: ScoreCombiner + Send,
|
TScoreCombiner: ScoreCombiner,
|
||||||
{
|
{
|
||||||
assert!(!scorers.is_empty());
|
assert!(!scorers.is_empty());
|
||||||
if scorers.len() == 1 {
|
if scorers.len() == 1 {
|
||||||
@@ -54,7 +52,7 @@ where
|
|||||||
SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(scorers)))
|
SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(scorers)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn into_box_scorer<TScoreCombiner: ScoreCombiner + Send>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
|
fn into_box_scorer<TScoreCombiner: ScoreCombiner>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
|
||||||
match scorer {
|
match scorer {
|
||||||
SpecializedScorer::TermUnion(term_scorers) => {
|
SpecializedScorer::TermUnion(term_scorers) => {
|
||||||
let union_scorer = Union::<TermScorer, TScoreCombiner>::from(term_scorers);
|
let union_scorer = Union::<TermScorer, TScoreCombiner>::from(term_scorers);
|
||||||
@@ -82,32 +80,18 @@ impl BooleanWeight {
|
|||||||
reader: &SegmentReader,
|
reader: &SegmentReader,
|
||||||
boost: Score,
|
boost: Score,
|
||||||
) -> crate::Result<HashMap<Occur, Vec<Box<dyn Scorer>>>> {
|
) -> crate::Result<HashMap<Occur, Vec<Box<dyn Scorer>>>> {
|
||||||
use rayon::iter::ParallelIterator;
|
|
||||||
use rayon::iter::IndexedParallelIterator;
|
|
||||||
let mut per_occur_scorers: HashMap<Occur, Vec<Box<dyn Scorer>>> = HashMap::new();
|
let mut per_occur_scorers: HashMap<Occur, Vec<Box<dyn Scorer>>> = HashMap::new();
|
||||||
let mut items_res: Vec<crate::Result<(Occur, Box<dyn Scorer>)>> = Vec::new();
|
for &(ref occur, ref subweight) in &self.weights {
|
||||||
let pool = rayon::ThreadPoolBuilder::new().num_threads(self.weights.len()).build().unwrap();
|
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
|
||||||
pool.install(|| {
|
|
||||||
self.weights.iter()
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.par_iter()
|
|
||||||
.map(|(occur, subweight)| {
|
|
||||||
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
|
|
||||||
Ok((*occur, sub_scorer))
|
|
||||||
})
|
|
||||||
.collect_into_vec(&mut items_res);
|
|
||||||
});
|
|
||||||
for item_res in items_res {
|
|
||||||
let (occur, sub_scorer) = item_res?;
|
|
||||||
per_occur_scorers
|
per_occur_scorers
|
||||||
.entry(occur)
|
.entry(*occur)
|
||||||
.or_insert_with(Vec::new)
|
.or_insert_with(Vec::new)
|
||||||
.push(sub_scorer);
|
.push(sub_scorer);
|
||||||
}
|
}
|
||||||
Ok(per_occur_scorers)
|
Ok(per_occur_scorers)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn complex_scorer<TScoreCombiner: ScoreCombiner >(
|
fn complex_scorer<TScoreCombiner: ScoreCombiner>(
|
||||||
&self,
|
&self,
|
||||||
reader: &SegmentReader,
|
reader: &SegmentReader,
|
||||||
boost: Score,
|
boost: Score,
|
||||||
|
|||||||
@@ -302,7 +302,7 @@ mod tests {
|
|||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
writer.set_merge_policy(Box::new(NoMergePolicy));
|
writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||||
for _ in 0..3_000 {
|
for _ in 0..3_000 {
|
||||||
let term_freq = rng.gen_range(1, 10000);
|
let term_freq = rng.gen_range(1..10000);
|
||||||
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
|
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
|
||||||
let text = words.join(" ");
|
let text = words.join(" ");
|
||||||
writer.add_document(doc!(text_field=>text));
|
writer.add_document(doc!(text_field=>text));
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use crate::schema::Value;
|
use crate::schema::Value;
|
||||||
use serde::Serialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
/// Internal representation of a document used for JSON
|
/// Internal representation of a document used for JSON
|
||||||
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
|
|||||||
/// A `NamedFieldDocument` is a simple representation of a document
|
/// A `NamedFieldDocument` is a simple representation of a document
|
||||||
/// as a `BTreeMap<String, Vec<Value>>`.
|
/// as a `BTreeMap<String, Vec<Value>>`.
|
||||||
///
|
///
|
||||||
#[derive(Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);
|
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
|
|||||||
/// Name of the compression scheme used in the doc store.
|
/// Name of the compression scheme used in the doc store.
|
||||||
///
|
///
|
||||||
/// This name is appended to the version string of tantivy.
|
/// This name is appended to the version string of tantivy.
|
||||||
pub const COMPRESSION: &'static str = "lz4";
|
pub const COMPRESSION: &str = "lz4";
|
||||||
|
|
||||||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
||||||
compressed.clear();
|
compressed.clear();
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ impl CheckpointBlock {
|
|||||||
|
|
||||||
/// Adding another checkpoint in the block.
|
/// Adding another checkpoint in the block.
|
||||||
pub fn push(&mut self, checkpoint: Checkpoint) {
|
pub fn push(&mut self, checkpoint: Checkpoint) {
|
||||||
|
if let Some(prev_checkpoint) = self.checkpoints.last() {
|
||||||
|
assert!(checkpoint.follows(prev_checkpoint));
|
||||||
|
}
|
||||||
self.checkpoints.push(checkpoint);
|
self.checkpoints.push(checkpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
const CHECKPOINT_PERIOD: usize = 8;
|
const CHECKPOINT_PERIOD: usize = 2;
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
mod block;
|
mod block;
|
||||||
@@ -26,6 +26,13 @@ pub struct Checkpoint {
|
|||||||
pub end_offset: u64,
|
pub end_offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Checkpoint {
|
||||||
|
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
|
||||||
|
(self.start_doc == other.end_doc) &&
|
||||||
|
(self.start_offset == other.end_offset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Checkpoint {
|
impl fmt::Debug for Checkpoint {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
write!(
|
write!(
|
||||||
@@ -39,13 +46,16 @@ impl fmt::Debug for Checkpoint {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use std::io;
|
use std::{io, iter};
|
||||||
|
|
||||||
|
use futures::executor::block_on;
|
||||||
use proptest::strategy::{BoxedStrategy, Strategy};
|
use proptest::strategy::{BoxedStrategy, Strategy};
|
||||||
|
|
||||||
use crate::directory::OwnedBytes;
|
use crate::directory::OwnedBytes;
|
||||||
|
use crate::indexer::NoMergePolicy;
|
||||||
|
use crate::schema::{SchemaBuilder, STORED, STRING};
|
||||||
use crate::store::index::Checkpoint;
|
use crate::store::index::Checkpoint;
|
||||||
use crate::DocId;
|
use crate::{DocAddress, DocId, Index, Term};
|
||||||
|
|
||||||
use super::{SkipIndex, SkipIndexBuilder};
|
use super::{SkipIndex, SkipIndexBuilder};
|
||||||
|
|
||||||
@@ -54,7 +64,7 @@ mod tests {
|
|||||||
let mut output: Vec<u8> = Vec::new();
|
let mut output: Vec<u8> = Vec::new();
|
||||||
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
|
||||||
skip_index_builder.write(&mut output)?;
|
skip_index_builder.write(&mut output)?;
|
||||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||||
let mut skip_cursor = skip_index.checkpoints();
|
let mut skip_cursor = skip_index.checkpoints();
|
||||||
assert!(skip_cursor.next().is_none());
|
assert!(skip_cursor.next().is_none());
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -72,7 +82,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
skip_index_builder.insert(checkpoint);
|
skip_index_builder.insert(checkpoint);
|
||||||
skip_index_builder.write(&mut output)?;
|
skip_index_builder.write(&mut output)?;
|
||||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||||
let mut skip_cursor = skip_index.checkpoints();
|
let mut skip_cursor = skip_index.checkpoints();
|
||||||
assert_eq!(skip_cursor.next(), Some(checkpoint));
|
assert_eq!(skip_cursor.next(), Some(checkpoint));
|
||||||
assert_eq!(skip_cursor.next(), None);
|
assert_eq!(skip_cursor.next(), None);
|
||||||
@@ -121,7 +131,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
skip_index_builder.write(&mut output)?;
|
skip_index_builder.write(&mut output)?;
|
||||||
|
|
||||||
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
|
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
&skip_index.checkpoints().collect::<Vec<_>>()[..],
|
&skip_index.checkpoints().collect::<Vec<_>>()[..],
|
||||||
&checkpoints[..]
|
&checkpoints[..]
|
||||||
@@ -133,6 +143,40 @@ mod tests {
|
|||||||
(doc as u64) * (doc as u64)
|
(doc as u64) * (doc as u64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
|
||||||
|
let mut schema_builder = SchemaBuilder::default();
|
||||||
|
let text = schema_builder.add_text_field("text", STORED | STRING);
|
||||||
|
let body = schema_builder.add_text_field("body", STORED);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
let index = Index::create_in_ram(schema);
|
||||||
|
let mut index_writer = index.writer_for_tests()?;
|
||||||
|
index_writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||||
|
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
|
||||||
|
.take(1_000)
|
||||||
|
.collect();
|
||||||
|
for _ in 0..20 {
|
||||||
|
index_writer.add_document(doc!(body=>long_text.clone()));
|
||||||
|
}
|
||||||
|
index_writer.commit()?;
|
||||||
|
index_writer.add_document(doc!(text=>"testb"));
|
||||||
|
for _ in 0..10 {
|
||||||
|
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
|
||||||
|
}
|
||||||
|
index_writer.commit()?;
|
||||||
|
index_writer.delete_term(Term::from_field_text(text, "testb"));
|
||||||
|
index_writer.commit()?;
|
||||||
|
let segment_ids = index.searchable_segment_ids()?;
|
||||||
|
block_on(index_writer.merge(&segment_ids))?;
|
||||||
|
let reader = index.reader()?;
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
assert_eq!(searcher.num_docs(), 30);
|
||||||
|
for i in 0..searcher.num_docs() as u32 {
|
||||||
|
let _doc = searcher.doc(DocAddress(0u32, i))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_skip_index_long() -> io::Result<()> {
|
fn test_skip_index_long() -> io::Result<()> {
|
||||||
let mut output: Vec<u8> = Vec::new();
|
let mut output: Vec<u8> = Vec::new();
|
||||||
@@ -150,7 +194,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
skip_index_builder.write(&mut output)?;
|
skip_index_builder.write(&mut output)?;
|
||||||
assert_eq!(output.len(), 4035);
|
assert_eq!(output.len(), 4035);
|
||||||
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
|
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
|
||||||
.checkpoints()
|
.checkpoints()
|
||||||
.collect();
|
.collect();
|
||||||
assert_eq!(&resulting_checkpoints, &checkpoints);
|
assert_eq!(&resulting_checkpoints, &checkpoints);
|
||||||
@@ -221,7 +265,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
skip_index_builder.write(&mut buffer).unwrap();
|
skip_index_builder.write(&mut buffer).unwrap();
|
||||||
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
|
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
|
||||||
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
|
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
|
||||||
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
|
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
|
||||||
test_skip_index_aux(skip_index, &checkpoints[..]);
|
test_skip_index_aux(skip_index, &checkpoints[..]);
|
||||||
|
|||||||
@@ -59,6 +59,24 @@ pub struct SkipIndex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SkipIndex {
|
impl SkipIndex {
|
||||||
|
pub fn open(mut data: OwnedBytes) -> SkipIndex {
|
||||||
|
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|el| el.0)
|
||||||
|
.collect();
|
||||||
|
let mut start_offset = 0;
|
||||||
|
let mut layers = Vec::new();
|
||||||
|
for end_offset in offsets {
|
||||||
|
let layer = Layer {
|
||||||
|
data: data.slice(start_offset as usize, end_offset as usize),
|
||||||
|
};
|
||||||
|
layers.push(layer);
|
||||||
|
start_offset = end_offset;
|
||||||
|
}
|
||||||
|
SkipIndex { layers }
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
|
||||||
self.layers
|
self.layers
|
||||||
.last()
|
.last()
|
||||||
@@ -90,22 +108,3 @@ impl SkipIndex {
|
|||||||
Some(cur_checkpoint)
|
Some(cur_checkpoint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<OwnedBytes> for SkipIndex {
|
|
||||||
fn from(mut data: OwnedBytes) -> SkipIndex {
|
|
||||||
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
|
|
||||||
.unwrap()
|
|
||||||
.into_iter()
|
|
||||||
.map(|el| el.0)
|
|
||||||
.collect();
|
|
||||||
let mut start_offset = 0;
|
|
||||||
let mut layers = Vec::new();
|
|
||||||
for end_offset in offsets {
|
|
||||||
layers.push(Layer {
|
|
||||||
data: data.slice(start_offset as usize, end_offset as usize),
|
|
||||||
});
|
|
||||||
start_offset = end_offset;
|
|
||||||
}
|
|
||||||
SkipIndex { layers }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -28,18 +28,20 @@ impl LayerBuilder {
|
|||||||
///
|
///
|
||||||
/// If the block was empty to begin with, simply return None.
|
/// If the block was empty to begin with, simply return None.
|
||||||
fn flush_block(&mut self) -> Option<Checkpoint> {
|
fn flush_block(&mut self) -> Option<Checkpoint> {
|
||||||
self.block.doc_interval().map(|(start_doc, end_doc)| {
|
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
|
||||||
let start_offset = self.buffer.len() as u64;
|
let start_offset = self.buffer.len() as u64;
|
||||||
self.block.serialize(&mut self.buffer);
|
self.block.serialize(&mut self.buffer);
|
||||||
let end_offset = self.buffer.len() as u64;
|
let end_offset = self.buffer.len() as u64;
|
||||||
self.block.clear();
|
self.block.clear();
|
||||||
Checkpoint {
|
Some(Checkpoint {
|
||||||
start_doc,
|
start_doc,
|
||||||
end_doc,
|
end_doc,
|
||||||
start_offset,
|
start_offset,
|
||||||
end_offset,
|
end_offset,
|
||||||
}
|
})
|
||||||
})
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push(&mut self, checkpoint: Checkpoint) {
|
fn push(&mut self, checkpoint: Checkpoint) {
|
||||||
@@ -48,7 +50,7 @@ impl LayerBuilder {
|
|||||||
|
|
||||||
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
|
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
|
||||||
self.push(checkpoint);
|
self.push(checkpoint);
|
||||||
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
|
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
|
||||||
if emit_skip_info {
|
if emit_skip_info {
|
||||||
self.flush_block()
|
self.flush_block()
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ impl StoreReader {
|
|||||||
let (data_file, offset_index_file) = split_file(store_file)?;
|
let (data_file, offset_index_file) = split_file(store_file)?;
|
||||||
let index_data = offset_index_file.read_bytes()?;
|
let index_data = offset_index_file.read_bytes()?;
|
||||||
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
|
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
|
||||||
let skip_index = SkipIndex::from(index_data);
|
let skip_index = SkipIndex::open(index_data);
|
||||||
Ok(StoreReader {
|
Ok(StoreReader {
|
||||||
data: data_file,
|
data: data_file,
|
||||||
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
|
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
|
||||||
|
|||||||
50
src/store/tests_store.rs
Normal file
50
src/store/tests_store.rs
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use crate::HasLen;
|
||||||
|
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
|
||||||
|
use crate::fastfield::DeleteBitSet;
|
||||||
|
|
||||||
|
use super::{StoreReader, StoreWriter};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_toto2() -> crate::Result<()> {
|
||||||
|
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
|
||||||
|
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
|
||||||
|
assert!(directory.validate_checksum(path)?);
|
||||||
|
let store_file = directory.open_read(path)?;
|
||||||
|
let store = StoreReader::open(store_file)?;
|
||||||
|
let documents = store.documents();
|
||||||
|
// for doc in documents {
|
||||||
|
// println!("{:?}", doc);
|
||||||
|
// }
|
||||||
|
let doc= store.get(15_086)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_toto() -> crate::Result<()> {
|
||||||
|
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
|
||||||
|
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
|
||||||
|
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
|
||||||
|
let store = StoreReader::open(store_file)?;
|
||||||
|
let doc= store.get(53)?;
|
||||||
|
println!("{:?}", doc);
|
||||||
|
// let documents = store.documents();
|
||||||
|
// let ram_directory = RAMDirectory::create();
|
||||||
|
// let path = Path::new("store");
|
||||||
|
|
||||||
|
// let store_wrt = ram_directory.open_write(path)?;
|
||||||
|
// let mut store_writer = StoreWriter::new(store_wrt);
|
||||||
|
// for doc in &documents {
|
||||||
|
// store_writer.store(doc)?;
|
||||||
|
// }
|
||||||
|
// store_writer.close()?;
|
||||||
|
// let store_data = ram_directory.open_read(path)?;
|
||||||
|
// let new_store = StoreReader::open(store_data)?;
|
||||||
|
// for doc in 0..59 {
|
||||||
|
// println!("{}", doc);
|
||||||
|
// let doc = new_store.get(doc)?;
|
||||||
|
// println!("{:?}", doc);
|
||||||
|
// }
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -10,7 +10,7 @@ use crate::store::index::Checkpoint;
|
|||||||
use crate::DocId;
|
use crate::DocId;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
|
||||||
const BLOCK_SIZE: usize = 16_384;
|
const BLOCK_SIZE: usize = 30;
|
||||||
|
|
||||||
/// Write tantivy's [`Store`](./index.html)
|
/// Write tantivy's [`Store`](./index.html)
|
||||||
///
|
///
|
||||||
@@ -72,6 +72,7 @@ impl StoreWriter {
|
|||||||
if !self.current_block.is_empty() {
|
if !self.current_block.is_empty() {
|
||||||
self.write_and_compress_block()?;
|
self.write_and_compress_block()?;
|
||||||
}
|
}
|
||||||
|
assert_eq!(self.first_doc_in_block, self.doc);
|
||||||
let doc_shift = self.doc;
|
let doc_shift = self.doc;
|
||||||
let start_shift = self.writer.written_bytes() as u64;
|
let start_shift = self.writer.written_bytes() as u64;
|
||||||
|
|
||||||
@@ -86,12 +87,17 @@ impl StoreWriter {
|
|||||||
checkpoint.end_doc += doc_shift;
|
checkpoint.end_doc += doc_shift;
|
||||||
checkpoint.start_offset += start_shift;
|
checkpoint.start_offset += start_shift;
|
||||||
checkpoint.end_offset += start_shift;
|
checkpoint.end_offset += start_shift;
|
||||||
self.offset_index_writer.insert(checkpoint);
|
self.register_checkpoint(checkpoint);
|
||||||
self.doc = checkpoint.end_doc;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||||
|
self.offset_index_writer.insert(checkpoint);
|
||||||
|
self.first_doc_in_block = checkpoint.end_doc;
|
||||||
|
self.doc = checkpoint.end_doc;
|
||||||
|
}
|
||||||
|
|
||||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||||
assert!(self.doc > 0);
|
assert!(self.doc > 0);
|
||||||
self.intermediary_buffer.clear();
|
self.intermediary_buffer.clear();
|
||||||
@@ -100,14 +106,13 @@ impl StoreWriter {
|
|||||||
self.writer.write_all(&self.intermediary_buffer)?;
|
self.writer.write_all(&self.intermediary_buffer)?;
|
||||||
let end_offset = self.writer.written_bytes();
|
let end_offset = self.writer.written_bytes();
|
||||||
let end_doc = self.doc;
|
let end_doc = self.doc;
|
||||||
self.offset_index_writer.insert(Checkpoint {
|
self.register_checkpoint(Checkpoint {
|
||||||
start_doc: self.first_doc_in_block,
|
start_doc: self.first_doc_in_block,
|
||||||
end_doc,
|
end_doc,
|
||||||
start_offset,
|
start_offset,
|
||||||
end_offset,
|
end_offset,
|
||||||
});
|
});
|
||||||
self.current_block.clear();
|
self.current_block.clear();
|
||||||
self.first_doc_in_block = self.doc;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,10 +22,8 @@ A second datastructure makes it possible to access a [`TermInfo`](../postings/st
|
|||||||
|
|
||||||
use tantivy_fst::automaton::AlwaysMatch;
|
use tantivy_fst::automaton::AlwaysMatch;
|
||||||
|
|
||||||
// mod fst_termdict;
|
mod fst_termdict;
|
||||||
// use fst_termdict as termdict;
|
use fst_termdict as termdict;
|
||||||
mod sstable_termdict;
|
|
||||||
use sstable_termdict as termdict;
|
|
||||||
|
|
||||||
mod merger;
|
mod merger;
|
||||||
|
|
||||||
|
|||||||
@@ -1,148 +0,0 @@
|
|||||||
use std::io;
|
|
||||||
|
|
||||||
mod sstable;
|
|
||||||
mod streamer;
|
|
||||||
mod termdict;
|
|
||||||
|
|
||||||
use self::sstable::value::{ValueReader, ValueWriter};
|
|
||||||
use self::sstable::{BlockReader, SSTable};
|
|
||||||
|
|
||||||
use crate::common::VInt;
|
|
||||||
use crate::postings::TermInfo;
|
|
||||||
|
|
||||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
|
||||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
|
||||||
|
|
||||||
pub struct TermSSTable;
|
|
||||||
|
|
||||||
impl SSTable for TermSSTable {
|
|
||||||
type Value = TermInfo;
|
|
||||||
type Reader = TermInfoReader;
|
|
||||||
type Writer = TermInfoWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct TermInfoReader {
|
|
||||||
term_infos: Vec<TermInfo>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ValueReader for TermInfoReader {
|
|
||||||
type Value = TermInfo;
|
|
||||||
|
|
||||||
fn value(&self, idx: usize) -> &TermInfo {
|
|
||||||
&self.term_infos[idx]
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
|
|
||||||
self.term_infos.clear();
|
|
||||||
let num_els = VInt::deserialize_u64(reader)?;
|
|
||||||
let mut start_offset = VInt::deserialize_u64(reader)?;
|
|
||||||
let mut positions_idx = 0;
|
|
||||||
for _ in 0..num_els {
|
|
||||||
let doc_freq = VInt::deserialize_u64(reader)? as u32;
|
|
||||||
let posting_num_bytes = VInt::deserialize_u64(reader)?;
|
|
||||||
let stop_offset = start_offset + posting_num_bytes;
|
|
||||||
let delta_positions_idx = VInt::deserialize_u64(reader)?;
|
|
||||||
positions_idx += delta_positions_idx;
|
|
||||||
let term_info = TermInfo {
|
|
||||||
doc_freq,
|
|
||||||
postings_start_offset: start_offset,
|
|
||||||
postings_stop_offset: stop_offset,
|
|
||||||
positions_idx,
|
|
||||||
};
|
|
||||||
self.term_infos.push(term_info);
|
|
||||||
start_offset = stop_offset;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct TermInfoWriter {
|
|
||||||
term_infos: Vec<TermInfo>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ValueWriter for TermInfoWriter {
|
|
||||||
type Value = TermInfo;
|
|
||||||
|
|
||||||
fn write(&mut self, term_info: &TermInfo) {
|
|
||||||
self.term_infos.push(term_info.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_block(&mut self, buffer: &mut Vec<u8>) {
|
|
||||||
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
|
|
||||||
if self.term_infos.is_empty() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut prev_position_idx = 0u64;
|
|
||||||
VInt(self.term_infos[0].postings_start_offset).serialize_into_vec(buffer);
|
|
||||||
for term_info in &self.term_infos {
|
|
||||||
VInt(term_info.doc_freq as u64).serialize_into_vec(buffer);
|
|
||||||
VInt(term_info.postings_stop_offset - term_info.postings_start_offset)
|
|
||||||
.serialize_into_vec(buffer);
|
|
||||||
VInt(term_info.positions_idx - prev_position_idx).serialize_into_vec(buffer);
|
|
||||||
prev_position_idx = term_info.positions_idx;
|
|
||||||
}
|
|
||||||
self.term_infos.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
use super::BlockReader;
|
|
||||||
|
|
||||||
use crate::directory::OwnedBytes;
|
|
||||||
use crate::postings::TermInfo;
|
|
||||||
use crate::termdict::sstable_termdict::sstable::value::{ValueReader, ValueWriter};
|
|
||||||
use crate::termdict::sstable_termdict::TermInfoReader;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_block_terminfos() -> io::Result<()> {
|
|
||||||
let mut term_info_writer = super::TermInfoWriter::default();
|
|
||||||
term_info_writer.write(&TermInfo {
|
|
||||||
doc_freq: 120u32,
|
|
||||||
postings_start_offset: 17u64,
|
|
||||||
postings_stop_offset: 45u64,
|
|
||||||
positions_idx: 10u64,
|
|
||||||
});
|
|
||||||
term_info_writer.write(&TermInfo {
|
|
||||||
doc_freq: 10u32,
|
|
||||||
postings_start_offset: 45u64,
|
|
||||||
postings_stop_offset: 450u64,
|
|
||||||
positions_idx: 104u64,
|
|
||||||
});
|
|
||||||
term_info_writer.write(&TermInfo {
|
|
||||||
doc_freq: 17u32,
|
|
||||||
postings_start_offset: 450u64,
|
|
||||||
postings_stop_offset: 462u64,
|
|
||||||
positions_idx: 210u64,
|
|
||||||
});
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
term_info_writer.write_block(&mut buffer);
|
|
||||||
let mut block_reader = make_block_reader(&buffer[..]);
|
|
||||||
let mut term_info_reader = TermInfoReader::default();
|
|
||||||
term_info_reader.read(&mut block_reader)?;
|
|
||||||
assert_eq!(
|
|
||||||
term_info_reader.value(0),
|
|
||||||
&TermInfo {
|
|
||||||
doc_freq: 120u32,
|
|
||||||
postings_start_offset: 17u64,
|
|
||||||
postings_stop_offset: 45u64,
|
|
||||||
positions_idx: 10u64
|
|
||||||
}
|
|
||||||
);
|
|
||||||
assert!(block_reader.buffer().is_empty());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_block_reader(data: &[u8]) -> BlockReader {
|
|
||||||
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
|
|
||||||
buffer.extend_from_slice(data);
|
|
||||||
let owned_bytes = OwnedBytes::new(buffer);
|
|
||||||
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
|
|
||||||
block_reader.read_block().unwrap();
|
|
||||||
block_reader
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,84 +0,0 @@
|
|||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
|
||||||
use std::io::{self, Read};
|
|
||||||
|
|
||||||
pub struct BlockReader<'a> {
|
|
||||||
buffer: Vec<u8>,
|
|
||||||
reader: Box<dyn io::Read + 'a>,
|
|
||||||
offset: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> BlockReader<'a> {
|
|
||||||
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
|
|
||||||
BlockReader {
|
|
||||||
buffer: Vec::new(),
|
|
||||||
reader,
|
|
||||||
offset: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize_u64(&mut self) -> u64 {
|
|
||||||
let (num_bytes, val) = super::vint::deserialize_read(self.buffer());
|
|
||||||
self.advance(num_bytes);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
pub fn buffer_from_to(&self, start: usize, end: usize) -> &[u8] {
|
|
||||||
&self.buffer[start..end]
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn buffer_from(&self, start: usize) -> &[u8] {
|
|
||||||
&self.buffer[start..]
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_block(&mut self) -> io::Result<bool> {
|
|
||||||
self.offset = 0;
|
|
||||||
let block_len_res = self.reader.read_u32::<LittleEndian>();
|
|
||||||
if let Err(err) = &block_len_res {
|
|
||||||
if err.kind() == io::ErrorKind::UnexpectedEof {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let block_len = block_len_res?;
|
|
||||||
if block_len == 0u32 {
|
|
||||||
self.buffer.clear();
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
self.buffer.resize(block_len as usize, 0u8);
|
|
||||||
self.reader.read_exact(&mut self.buffer[..])?;
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn offset(&self) -> usize {
|
|
||||||
self.offset
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn advance(&mut self, num_bytes: usize) {
|
|
||||||
self.offset += num_bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn buffer(&self) -> &[u8] {
|
|
||||||
&self.buffer[self.offset..]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> io::Read for BlockReader<'a> {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
let len = self.buffer().read(buf)?;
|
|
||||||
self.advance(len);
|
|
||||||
Ok(len)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
|
|
||||||
let len = self.buffer.len();
|
|
||||||
buf.extend_from_slice(self.buffer());
|
|
||||||
self.advance(len);
|
|
||||||
Ok(len)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
|
|
||||||
self.buffer().read_exact(buf)?;
|
|
||||||
self.advance(buf.len());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,203 +0,0 @@
|
|||||||
use std::io::{self, BufWriter, Write};
|
|
||||||
|
|
||||||
use crate::common::CountingWriter;
|
|
||||||
|
|
||||||
use super::value::ValueWriter;
|
|
||||||
use super::{value, vint, BlockReader};
|
|
||||||
|
|
||||||
const FOUR_BIT_LIMITS: usize = 1 << 4;
|
|
||||||
const VINT_MODE: u8 = 1u8;
|
|
||||||
const BLOCK_LEN: usize = 256_000;
|
|
||||||
|
|
||||||
pub struct DeltaWriter<W, TValueWriter>
|
|
||||||
where
|
|
||||||
W: io::Write,
|
|
||||||
{
|
|
||||||
block: Vec<u8>,
|
|
||||||
write: CountingWriter<BufWriter<W>>,
|
|
||||||
value_writer: TValueWriter,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
|
||||||
where
|
|
||||||
W: io::Write,
|
|
||||||
TValueWriter: ValueWriter,
|
|
||||||
{
|
|
||||||
pub fn new(wrt: W) -> Self {
|
|
||||||
DeltaWriter {
|
|
||||||
block: Vec::with_capacity(BLOCK_LEN * 2),
|
|
||||||
write: CountingWriter::wrap(BufWriter::new(wrt)),
|
|
||||||
value_writer: TValueWriter::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
|
||||||
where
|
|
||||||
W: io::Write,
|
|
||||||
TValueWriter: value::ValueWriter,
|
|
||||||
{
|
|
||||||
pub fn flush_block(&mut self) -> io::Result<Option<(u64, u64)>> {
|
|
||||||
if self.block.is_empty() {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
let start_offset = self.write.written_bytes();
|
|
||||||
// TODO avoid buffer allocation
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
self.value_writer.write_block(&mut buffer);
|
|
||||||
let block_len = buffer.len() + self.block.len();
|
|
||||||
self.write.write_all(&(block_len as u32).to_le_bytes())?;
|
|
||||||
self.write.write_all(&buffer[..])?;
|
|
||||||
self.write.write_all(&mut self.block[..])?;
|
|
||||||
let end_offset = self.write.written_bytes();
|
|
||||||
self.block.clear();
|
|
||||||
Ok(Some((start_offset, end_offset)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
|
|
||||||
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
|
|
||||||
let b = (keep_len | add_len << 4) as u8;
|
|
||||||
self.block.extend_from_slice(&[b])
|
|
||||||
} else {
|
|
||||||
let mut buf = [VINT_MODE; 20];
|
|
||||||
let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
|
|
||||||
len += vint::serialize(add_len as u64, &mut buf[len..]);
|
|
||||||
self.block.extend_from_slice(&mut buf[..len])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
|
|
||||||
let keep_len = common_prefix_len;
|
|
||||||
let add_len = suffix.len();
|
|
||||||
self.encode_keep_add(keep_len, add_len);
|
|
||||||
self.block.extend_from_slice(suffix);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
|
|
||||||
self.value_writer.write(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_delta(
|
|
||||||
&mut self,
|
|
||||||
common_prefix_len: usize,
|
|
||||||
suffix: &[u8],
|
|
||||||
value: &TValueWriter::Value,
|
|
||||||
) {
|
|
||||||
self.write_suffix(common_prefix_len, suffix);
|
|
||||||
self.write_value(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush_block_if_required(&mut self) -> io::Result<Option<(u64, u64)>> {
|
|
||||||
if self.block.len() > BLOCK_LEN {
|
|
||||||
return self.flush_block();
|
|
||||||
}
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn finalize(mut self) -> CountingWriter<BufWriter<W>> {
|
|
||||||
self.write
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DeltaReader<'a, TValueReader> {
|
|
||||||
common_prefix_len: usize,
|
|
||||||
suffix_start: usize,
|
|
||||||
suffix_end: usize,
|
|
||||||
value_reader: TValueReader,
|
|
||||||
block_reader: BlockReader<'a>,
|
|
||||||
idx: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, TValueReader> DeltaReader<'a, TValueReader>
|
|
||||||
where
|
|
||||||
TValueReader: value::ValueReader,
|
|
||||||
{
|
|
||||||
pub fn new<R: io::Read + 'a>(reader: R) -> Self {
|
|
||||||
DeltaReader {
|
|
||||||
idx: 0,
|
|
||||||
common_prefix_len: 0,
|
|
||||||
suffix_start: 0,
|
|
||||||
suffix_end: 0,
|
|
||||||
value_reader: TValueReader::default(),
|
|
||||||
block_reader: BlockReader::new(Box::new(reader)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deserialize_vint(&mut self) -> u64 {
|
|
||||||
self.block_reader.deserialize_u64()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_keep_add(&mut self) -> Option<(usize, usize)> {
|
|
||||||
let b = {
|
|
||||||
let buf = &self.block_reader.buffer();
|
|
||||||
if buf.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
buf[0]
|
|
||||||
};
|
|
||||||
self.block_reader.advance(1);
|
|
||||||
match b {
|
|
||||||
VINT_MODE => {
|
|
||||||
let keep = self.deserialize_vint() as usize;
|
|
||||||
let add = self.deserialize_vint() as usize;
|
|
||||||
Some((keep, add))
|
|
||||||
}
|
|
||||||
b => {
|
|
||||||
let keep = (b & 0b1111) as usize;
|
|
||||||
let add = (b >> 4) as usize;
|
|
||||||
Some((keep, add))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_delta_key(&mut self) -> bool {
|
|
||||||
if let Some((keep, add)) = self.read_keep_add() {
|
|
||||||
self.common_prefix_len = keep;
|
|
||||||
self.suffix_start = self.block_reader.offset();
|
|
||||||
self.suffix_end = self.suffix_start + add;
|
|
||||||
self.block_reader.advance(add);
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn advance(&mut self) -> io::Result<bool> {
|
|
||||||
if self.block_reader.buffer().is_empty() {
|
|
||||||
if !self.block_reader.read_block()? {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
self.value_reader.read(&mut self.block_reader)?;
|
|
||||||
self.idx = 0;
|
|
||||||
} else {
|
|
||||||
self.idx += 1;
|
|
||||||
}
|
|
||||||
if !self.read_delta_key() {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn common_prefix_len(&self) -> usize {
|
|
||||||
self.common_prefix_len
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn suffix(&self) -> &[u8] {
|
|
||||||
&self
|
|
||||||
.block_reader
|
|
||||||
.buffer_from_to(self.suffix_start, self.suffix_end)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn suffix_from(&self, offset: usize) -> &[u8] {
|
|
||||||
&self.block_reader.buffer_from_to(
|
|
||||||
self.suffix_start
|
|
||||||
.wrapping_add(offset)
|
|
||||||
.wrapping_sub(self.common_prefix_len),
|
|
||||||
self.suffix_end,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn value(&self) -> &TValueReader::Value {
|
|
||||||
self.value_reader.value(self.idx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,72 +0,0 @@
|
|||||||
use crate::termdict::sstable_termdict::sstable::{Reader, SSTable, Writer};
|
|
||||||
|
|
||||||
use super::SingleValueMerger;
|
|
||||||
use super::ValueMerger;
|
|
||||||
use std::cmp::Ordering;
|
|
||||||
use std::collections::binary_heap::PeekMut;
|
|
||||||
use std::collections::BinaryHeap;
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
struct HeapItem<B: AsRef<[u8]>>(B);
|
|
||||||
|
|
||||||
impl<B: AsRef<[u8]>> Ord for HeapItem<B> {
|
|
||||||
fn cmp(&self, other: &Self) -> Ordering {
|
|
||||||
other.0.as_ref().cmp(self.0.as_ref())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<B: AsRef<[u8]>> PartialOrd for HeapItem<B> {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
||||||
Some(other.0.as_ref().cmp(self.0.as_ref()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B: AsRef<[u8]>> Eq for HeapItem<B> {}
|
|
||||||
impl<B: AsRef<[u8]>> PartialEq for HeapItem<B> {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.0.as_ref() == other.0.as_ref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
|
|
||||||
readers: Vec<Reader<SST::Reader>>,
|
|
||||||
mut writer: Writer<W, SST::Writer>,
|
|
||||||
mut merger: M,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
let mut heap: BinaryHeap<HeapItem<Reader<SST::Reader>>> =
|
|
||||||
BinaryHeap::with_capacity(readers.len());
|
|
||||||
for mut reader in readers {
|
|
||||||
if reader.advance()? {
|
|
||||||
heap.push(HeapItem(reader));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
loop {
|
|
||||||
let len = heap.len();
|
|
||||||
let mut value_merger;
|
|
||||||
if let Some(mut head) = heap.peek_mut() {
|
|
||||||
writer.write_key(head.0.key());
|
|
||||||
value_merger = merger.new_value(head.0.value());
|
|
||||||
if !head.0.advance()? {
|
|
||||||
PeekMut::pop(head);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
for _ in 0..len - 1 {
|
|
||||||
if let Some(mut head) = heap.peek_mut() {
|
|
||||||
if head.0.key() == writer.current_key() {
|
|
||||||
value_merger.add(head.0.value());
|
|
||||||
if !head.0.advance()? {
|
|
||||||
PeekMut::pop(head);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let value = value_merger.finish();
|
|
||||||
writer.write_value(&value);
|
|
||||||
writer.flush_block_if_required()?;
|
|
||||||
}
|
|
||||||
writer.finalize()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -1,184 +0,0 @@
|
|||||||
mod heap_merge;
|
|
||||||
|
|
||||||
pub use self::heap_merge::merge_sstable;
|
|
||||||
|
|
||||||
pub trait SingleValueMerger<V> {
|
|
||||||
fn add(&mut self, v: &V);
|
|
||||||
fn finish(self) -> V;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait ValueMerger<V> {
|
|
||||||
type TSingleValueMerger: SingleValueMerger<V>;
|
|
||||||
fn new_value(&mut self, v: &V) -> Self::TSingleValueMerger;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct KeepFirst;
|
|
||||||
|
|
||||||
pub struct FirstVal<V>(V);
|
|
||||||
|
|
||||||
impl<V: Clone> ValueMerger<V> for KeepFirst {
|
|
||||||
type TSingleValueMerger = FirstVal<V>;
|
|
||||||
|
|
||||||
fn new_value(&mut self, v: &V) -> FirstVal<V> {
|
|
||||||
FirstVal(v.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<V> SingleValueMerger<V> for FirstVal<V> {
|
|
||||||
fn add(&mut self, _: &V) {}
|
|
||||||
|
|
||||||
fn finish(self) -> V {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct VoidMerge;
|
|
||||||
impl ValueMerger<()> for VoidMerge {
|
|
||||||
type TSingleValueMerger = ();
|
|
||||||
|
|
||||||
fn new_value(&mut self, _: &()) -> () {
|
|
||||||
()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct U64Merge;
|
|
||||||
impl ValueMerger<u64> for U64Merge {
|
|
||||||
type TSingleValueMerger = u64;
|
|
||||||
|
|
||||||
fn new_value(&mut self, val: &u64) -> u64 {
|
|
||||||
*val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SingleValueMerger<u64> for u64 {
|
|
||||||
fn add(&mut self, val: &u64) {
|
|
||||||
*self += *val;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finish(self) -> u64 {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SingleValueMerger<()> for () {
|
|
||||||
fn add(&mut self, _: &()) {}
|
|
||||||
|
|
||||||
fn finish(self) -> () {
|
|
||||||
()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
|
|
||||||
use super::super::SSTable;
|
|
||||||
use super::super::{SSTableMonotonicU64, VoidSSTable};
|
|
||||||
use super::U64Merge;
|
|
||||||
use super::VoidMerge;
|
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
|
||||||
use std::str;
|
|
||||||
|
|
||||||
fn write_sstable(keys: &[&'static str]) -> Vec<u8> {
|
|
||||||
let mut buffer: Vec<u8> = vec![];
|
|
||||||
{
|
|
||||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
for &key in keys {
|
|
||||||
assert!(sstable_writer.write(key.as_bytes(), &()).is_ok());
|
|
||||||
}
|
|
||||||
assert!(sstable_writer.finalize().is_ok());
|
|
||||||
}
|
|
||||||
dbg!(&buffer);
|
|
||||||
buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec<u8> {
|
|
||||||
let mut buffer: Vec<u8> = vec![];
|
|
||||||
{
|
|
||||||
let mut sstable_writer = SSTableMonotonicU64::writer(&mut buffer);
|
|
||||||
for (key, val) in keys {
|
|
||||||
assert!(sstable_writer.write(key.as_bytes(), val).is_ok());
|
|
||||||
}
|
|
||||||
assert!(sstable_writer.finalize().is_ok());
|
|
||||||
}
|
|
||||||
buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
fn merge_test_aux(arrs: &[&[&'static str]]) {
|
|
||||||
let sstables = arrs.iter().cloned().map(write_sstable).collect::<Vec<_>>();
|
|
||||||
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
|
|
||||||
let mut merged = BTreeSet::new();
|
|
||||||
for &arr in arrs.iter() {
|
|
||||||
for &s in arr {
|
|
||||||
merged.insert(s.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut w = Vec::new();
|
|
||||||
assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok());
|
|
||||||
let mut reader = VoidSSTable::reader(&w[..]);
|
|
||||||
for k in merged {
|
|
||||||
assert!(reader.advance().unwrap());
|
|
||||||
assert_eq!(reader.key(), k.as_bytes());
|
|
||||||
}
|
|
||||||
assert!(!reader.advance().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
fn merge_test_u64_monotonic_aux(arrs: &[&[(&'static str, u64)]]) {
|
|
||||||
let sstables = arrs
|
|
||||||
.iter()
|
|
||||||
.cloned()
|
|
||||||
.map(write_sstable_u64)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
|
|
||||||
let mut merged = BTreeMap::new();
|
|
||||||
for &arr in arrs.iter() {
|
|
||||||
for (key, val) in arr {
|
|
||||||
let entry = merged.entry(key.to_string()).or_insert(0u64);
|
|
||||||
*entry += val;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut w = Vec::new();
|
|
||||||
assert!(SSTableMonotonicU64::merge(sstables_ref, &mut w, U64Merge).is_ok());
|
|
||||||
let mut reader = SSTableMonotonicU64::reader(&w[..]);
|
|
||||||
for (k, v) in merged {
|
|
||||||
assert!(reader.advance().unwrap());
|
|
||||||
assert_eq!(reader.key(), k.as_bytes());
|
|
||||||
assert_eq!(reader.value(), &v);
|
|
||||||
}
|
|
||||||
assert!(!reader.advance().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_simple_reproduce() {
|
|
||||||
let sstable_data = write_sstable(&["a"]);
|
|
||||||
let mut reader = VoidSSTable::reader(&sstable_data[..]);
|
|
||||||
assert!(reader.advance().unwrap());
|
|
||||||
assert_eq!(reader.key(), b"a");
|
|
||||||
assert!(!reader.advance().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge() {
|
|
||||||
merge_test_aux(&[]);
|
|
||||||
merge_test_aux(&[&["a"]]);
|
|
||||||
merge_test_aux(&[&["a", "b"], &["ab"]]); // a, ab, b
|
|
||||||
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
|
|
||||||
merge_test_aux(&[
|
|
||||||
&["happy", "hello", "payer", "tax"],
|
|
||||||
&["habitat", "hello", "zoo"],
|
|
||||||
&[],
|
|
||||||
&["a"],
|
|
||||||
]);
|
|
||||||
merge_test_aux(&[&["a"]]);
|
|
||||||
merge_test_aux(&[&["a", "b"], &["ab"]]);
|
|
||||||
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_u64() {
|
|
||||||
merge_test_u64_monotonic_aux(&[]);
|
|
||||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64)]]);
|
|
||||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 3u64)], &[("ab", 2u64)]]); // a, ab, b
|
|
||||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 2u64)], &[("a", 16u64), ("b", 23u64)]]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,365 +0,0 @@
|
|||||||
use merge::ValueMerger;
|
|
||||||
use std::io::{self, Write};
|
|
||||||
use std::usize;
|
|
||||||
|
|
||||||
mod delta;
|
|
||||||
pub mod merge;
|
|
||||||
pub mod value;
|
|
||||||
|
|
||||||
pub(crate) mod sstable_index;
|
|
||||||
|
|
||||||
pub(crate) use self::sstable_index::{SSTableIndex, SSTableIndexBuilder};
|
|
||||||
pub(crate) mod vint;
|
|
||||||
|
|
||||||
mod block_reader;
|
|
||||||
pub use self::delta::DeltaReader;
|
|
||||||
use self::delta::DeltaWriter;
|
|
||||||
use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter};
|
|
||||||
|
|
||||||
pub use self::block_reader::BlockReader;
|
|
||||||
pub use self::merge::VoidMerge;
|
|
||||||
|
|
||||||
const DEFAULT_KEY_CAPACITY: usize = 50;
|
|
||||||
|
|
||||||
pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
|
|
||||||
left.iter()
|
|
||||||
.cloned()
|
|
||||||
.zip(right.iter().cloned())
|
|
||||||
.take_while(|(left, right)| left == right)
|
|
||||||
.count()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait SSTable: Sized {
|
|
||||||
type Value;
|
|
||||||
type Reader: ValueReader<Value = Self::Value>;
|
|
||||||
type Writer: ValueWriter<Value = Self::Value>;
|
|
||||||
|
|
||||||
fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::Writer> {
|
|
||||||
DeltaWriter::new(write)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn writer<W: io::Write>(write: W) -> Writer<W, Self::Writer> {
|
|
||||||
Writer {
|
|
||||||
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
|
|
||||||
num_terms: 0u64,
|
|
||||||
index_builder: SSTableIndexBuilder::default(),
|
|
||||||
delta_writer: Self::delta_writer(write),
|
|
||||||
first_ordinal_of_the_block: 0u64,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> {
|
|
||||||
DeltaReader::new(reader)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> {
|
|
||||||
Reader {
|
|
||||||
key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
|
|
||||||
delta_reader: Self::delta_reader(reader),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(
|
|
||||||
io_readers: Vec<R>,
|
|
||||||
w: W,
|
|
||||||
merger: M,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
|
|
||||||
let writer = Self::writer(w);
|
|
||||||
merge::merge_sstable::<Self, _, _>(readers, writer, merger)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct VoidSSTable;
|
|
||||||
|
|
||||||
impl SSTable for VoidSSTable {
|
|
||||||
type Value = ();
|
|
||||||
type Reader = value::VoidReader;
|
|
||||||
type Writer = value::VoidWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SSTableMonotonicU64;
|
|
||||||
|
|
||||||
impl SSTable for SSTableMonotonicU64 {
|
|
||||||
type Value = u64;
|
|
||||||
|
|
||||||
type Reader = U64MonotonicReader;
|
|
||||||
|
|
||||||
type Writer = U64MonotonicWriter;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Reader<'a, TValueReader> {
|
|
||||||
key: Vec<u8>,
|
|
||||||
delta_reader: DeltaReader<'a, TValueReader>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, TValueReader> Reader<'a, TValueReader>
|
|
||||||
where
|
|
||||||
TValueReader: ValueReader,
|
|
||||||
{
|
|
||||||
pub fn advance(&mut self) -> io::Result<bool> {
|
|
||||||
if !self.delta_reader.advance()? {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
let common_prefix_len = self.delta_reader.common_prefix_len();
|
|
||||||
let suffix = self.delta_reader.suffix();
|
|
||||||
let new_len = self.delta_reader.common_prefix_len() + suffix.len();
|
|
||||||
self.key.resize(new_len, 0u8);
|
|
||||||
self.key[common_prefix_len..].copy_from_slice(suffix);
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn key(&self) -> &[u8] {
|
|
||||||
&self.key
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn value(&self) -> &TValueReader::Value {
|
|
||||||
self.delta_reader.value()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn into_delta_reader(self) -> DeltaReader<'a, TValueReader> {
|
|
||||||
assert!(self.key.is_empty());
|
|
||||||
self.delta_reader
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
|
|
||||||
fn as_ref(&self) -> &[u8] {
|
|
||||||
&self.key
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Writer<W, TValueWriter>
|
|
||||||
where
|
|
||||||
W: io::Write,
|
|
||||||
{
|
|
||||||
previous_key: Vec<u8>,
|
|
||||||
index_builder: SSTableIndexBuilder,
|
|
||||||
delta_writer: DeltaWriter<W, TValueWriter>,
|
|
||||||
num_terms: u64,
|
|
||||||
first_ordinal_of_the_block: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W, TValueWriter> Writer<W, TValueWriter>
|
|
||||||
where
|
|
||||||
W: io::Write,
|
|
||||||
TValueWriter: value::ValueWriter,
|
|
||||||
{
|
|
||||||
pub(crate) fn current_key(&self) -> &[u8] {
|
|
||||||
&self.previous_key[..]
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_key(&mut self, key: &[u8]) {
|
|
||||||
let keep_len = common_prefix_len(&self.previous_key, key);
|
|
||||||
let add_len = key.len() - keep_len;
|
|
||||||
let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
|
|
||||||
|| self.previous_key.is_empty()
|
|
||||||
|| self.previous_key[keep_len] < key[keep_len];
|
|
||||||
assert!(
|
|
||||||
increasing_keys,
|
|
||||||
"Keys should be increasing. ({:?} > {:?})",
|
|
||||||
self.previous_key, key
|
|
||||||
);
|
|
||||||
self.previous_key.resize(key.len(), 0u8);
|
|
||||||
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
|
|
||||||
self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn into_delta_writer(self) -> DeltaWriter<W, TValueWriter> {
|
|
||||||
self.delta_writer
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> {
|
|
||||||
self.write_key(key);
|
|
||||||
self.write_value(value)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
|
|
||||||
self.delta_writer.write_value(value);
|
|
||||||
self.num_terms += 1u64;
|
|
||||||
self.flush_block_if_required()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn flush_block_if_required(&mut self) -> io::Result<()> {
|
|
||||||
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block_if_required()? {
|
|
||||||
self.index_builder.add_block(
|
|
||||||
&self.previous_key[..],
|
|
||||||
start_offset,
|
|
||||||
end_offset,
|
|
||||||
self.first_ordinal_of_the_block,
|
|
||||||
);
|
|
||||||
self.first_ordinal_of_the_block = self.num_terms;
|
|
||||||
self.previous_key.clear();
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn finalize(mut self) -> io::Result<W> {
|
|
||||||
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block()? {
|
|
||||||
self.index_builder.add_block(
|
|
||||||
&self.previous_key[..],
|
|
||||||
start_offset,
|
|
||||||
end_offset,
|
|
||||||
self.first_ordinal_of_the_block,
|
|
||||||
);
|
|
||||||
self.first_ordinal_of_the_block = self.num_terms;
|
|
||||||
}
|
|
||||||
let mut wrt = self.delta_writer.finalize();
|
|
||||||
wrt.write_all(&0u32.to_le_bytes())?;
|
|
||||||
|
|
||||||
let offset = wrt.written_bytes();
|
|
||||||
|
|
||||||
self.index_builder.serialize(&mut wrt)?;
|
|
||||||
wrt.write_all(&offset.to_le_bytes())?;
|
|
||||||
wrt.write_all(&self.num_terms.to_le_bytes())?;
|
|
||||||
let wrt = wrt.finish();
|
|
||||||
Ok(wrt.into_inner()?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
use super::SSTable;
|
|
||||||
use super::VoidMerge;
|
|
||||||
use super::VoidSSTable;
|
|
||||||
use super::{common_prefix_len, SSTableMonotonicU64};
|
|
||||||
|
|
||||||
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
|
|
||||||
assert_eq!(
|
|
||||||
common_prefix_len(left.as_bytes(), right.as_bytes()),
|
|
||||||
expect_len
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
common_prefix_len(right.as_bytes(), left.as_bytes()),
|
|
||||||
expect_len
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_common_prefix_len() {
|
|
||||||
aux_test_common_prefix_len("a", "ab", 1);
|
|
||||||
aux_test_common_prefix_len("", "ab", 0);
|
|
||||||
aux_test_common_prefix_len("ab", "abc", 2);
|
|
||||||
aux_test_common_prefix_len("abde", "abce", 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_long_key_diff() {
|
|
||||||
let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
|
|
||||||
let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
|
|
||||||
let mut buffer = vec![];
|
|
||||||
{
|
|
||||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
assert!(sstable_writer.write(&long_key[..], &()).is_ok());
|
|
||||||
assert!(sstable_writer.write(&[0, 3, 4], &()).is_ok());
|
|
||||||
assert!(sstable_writer.write(&long_key2[..], &()).is_ok());
|
|
||||||
assert!(sstable_writer.finalize().is_ok());
|
|
||||||
}
|
|
||||||
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &long_key[..]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &[0, 3, 4]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &long_key2[..]);
|
|
||||||
assert!(!sstable_reader.advance().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_simple_sstable() {
|
|
||||||
let mut buffer = vec![];
|
|
||||||
{
|
|
||||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
assert!(sstable_writer.write(&[17u8], &()).is_ok());
|
|
||||||
assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok());
|
|
||||||
assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok());
|
|
||||||
assert!(sstable_writer.finalize().is_ok());
|
|
||||||
}
|
|
||||||
assert_eq!(
|
|
||||||
&buffer,
|
|
||||||
&[
|
|
||||||
// block len
|
|
||||||
7u8, 0u8, 0u8, 0u8, // keep 0 push 1 | ""
|
|
||||||
16u8, 17u8, // keep 1 push 2 | 18 19
|
|
||||||
33u8, 18u8, 19u8, // keep 1 push 1 | 20
|
|
||||||
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
|
|
||||||
// index
|
|
||||||
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 104, 108, 97, 115, 116, 95, 107,
|
|
||||||
101, 121, 130, 17, 20, 106, 98, 108, 111, 99, 107, 95, 97, 100, 100, 114, 163, 108,
|
|
||||||
115, 116, 97, 114, 116, 95, 111, 102, 102, 115, 101, 116, 0, 106, 101, 110, 100,
|
|
||||||
95, 111, 102, 102, 115, 101, 116, 11, 109, 102, 105, 114, 115, 116, 95, 111, 114,
|
|
||||||
100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0, 0, 0, 0, // offset for the index
|
|
||||||
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
|
|
||||||
]
|
|
||||||
);
|
|
||||||
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &[17u8]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
|
|
||||||
assert!(sstable_reader.advance().unwrap());
|
|
||||||
assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
|
|
||||||
assert!(!sstable_reader.advance().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[should_panic]
|
|
||||||
fn test_simple_sstable_non_increasing_key() {
|
|
||||||
let mut buffer = vec![];
|
|
||||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
assert!(sstable_writer.write(&[17u8], &()).is_ok());
|
|
||||||
assert!(sstable_writer.write(&[16u8], &()).is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_abcd_abe() {
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
{
|
|
||||||
let mut writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
writer.write(b"abcd", &()).unwrap();
|
|
||||||
writer.write(b"abe", &()).unwrap();
|
|
||||||
writer.finalize().unwrap();
|
|
||||||
}
|
|
||||||
let mut output = Vec::new();
|
|
||||||
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
|
|
||||||
assert_eq!(&output[..], &buffer[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sstable() {
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
{
|
|
||||||
let mut writer = VoidSSTable::writer(&mut buffer);
|
|
||||||
writer.write(b"abcd", &()).unwrap();
|
|
||||||
writer.write(b"abe", &()).unwrap();
|
|
||||||
writer.finalize().unwrap();
|
|
||||||
}
|
|
||||||
let mut output = Vec::new();
|
|
||||||
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
|
|
||||||
assert_eq!(&output[..], &buffer[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sstable_u64() -> io::Result<()> {
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
let mut writer = SSTableMonotonicU64::writer(&mut buffer);
|
|
||||||
writer.write(b"abcd", &1u64)?;
|
|
||||||
writer.write(b"abe", &4u64)?;
|
|
||||||
writer.write(b"gogo", &4324234234234234u64)?;
|
|
||||||
writer.finalize()?;
|
|
||||||
let mut reader = SSTableMonotonicU64::reader(&buffer[..]);
|
|
||||||
assert!(reader.advance()?);
|
|
||||||
assert_eq!(reader.key(), b"abcd");
|
|
||||||
assert_eq!(reader.value(), &1u64);
|
|
||||||
assert!(reader.advance()?);
|
|
||||||
assert_eq!(reader.key(), b"abe");
|
|
||||||
assert_eq!(reader.value(), &4u64);
|
|
||||||
assert!(reader.advance()?);
|
|
||||||
assert_eq!(reader.key(), b"gogo");
|
|
||||||
assert_eq!(reader.value(), &4324234234234234u64);
|
|
||||||
assert!(!reader.advance()?);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,90 +0,0 @@
|
|||||||
use std::io;
|
|
||||||
|
|
||||||
use serde;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct SSTableIndex {
|
|
||||||
blocks: Vec<BlockMeta>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SSTableIndex {
|
|
||||||
pub fn load(data: &[u8]) -> SSTableIndex {
|
|
||||||
// TODO
|
|
||||||
serde_cbor::de::from_slice(data).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
|
|
||||||
self.blocks
|
|
||||||
.iter()
|
|
||||||
.find(|block| &block.last_key[..] >= &key)
|
|
||||||
.map(|block| block.block_addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Eq, PartialEq, Debug, Copy, Serialize, Deserialize)]
|
|
||||||
pub struct BlockAddr {
|
|
||||||
pub start_offset: u64,
|
|
||||||
pub end_offset: u64,
|
|
||||||
pub first_ordinal: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
struct BlockMeta {
|
|
||||||
pub last_key: Vec<u8>,
|
|
||||||
pub block_addr: BlockAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct SSTableIndexBuilder {
|
|
||||||
index: SSTableIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SSTableIndexBuilder {
|
|
||||||
pub fn add_block(
|
|
||||||
&mut self,
|
|
||||||
last_key: &[u8],
|
|
||||||
start_offset: u64,
|
|
||||||
stop_offset: u64,
|
|
||||||
first_ordinal: u64,
|
|
||||||
) {
|
|
||||||
self.index.blocks.push(BlockMeta {
|
|
||||||
last_key: last_key.to_vec(),
|
|
||||||
block_addr: BlockAddr {
|
|
||||||
start_offset,
|
|
||||||
end_offset: stop_offset,
|
|
||||||
first_ordinal,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn serialize(&self, wrt: &mut dyn io::Write) -> io::Result<()> {
|
|
||||||
serde_cbor::ser::to_writer(wrt, &self.index).unwrap();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sstable_index() {
|
|
||||||
let mut sstable_builder = SSTableIndexBuilder::default();
|
|
||||||
sstable_builder.add_block(b"aaa", 10u64, 20u64, 0u64);
|
|
||||||
sstable_builder.add_block(b"bbbbbbb", 20u64, 30u64, 564);
|
|
||||||
sstable_builder.add_block(b"ccc", 30u64, 40u64, 10u64);
|
|
||||||
sstable_builder.add_block(b"dddd", 40u64, 50u64, 15u64);
|
|
||||||
let mut buffer: Vec<u8> = Vec::new();
|
|
||||||
sstable_builder.serialize(&mut buffer).unwrap();
|
|
||||||
let sstable = SSTableIndex::load(&buffer[..]);
|
|
||||||
assert_eq!(
|
|
||||||
sstable.search(b"bbbde"),
|
|
||||||
Some(BlockAddr {
|
|
||||||
first_ordinal: 10u64,
|
|
||||||
start_offset: 30u64,
|
|
||||||
end_offset: 40u64
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,94 +0,0 @@
|
|||||||
use super::{vint, BlockReader};
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
pub trait ValueReader: Default {
|
|
||||||
type Value;
|
|
||||||
|
|
||||||
fn value(&self, idx: usize) -> &Self::Value;
|
|
||||||
|
|
||||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait ValueWriter: Default {
|
|
||||||
type Value;
|
|
||||||
|
|
||||||
fn write(&mut self, val: &Self::Value);
|
|
||||||
|
|
||||||
fn write_block(&mut self, writer: &mut Vec<u8>);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct VoidReader;
|
|
||||||
|
|
||||||
impl ValueReader for VoidReader {
|
|
||||||
type Value = ();
|
|
||||||
|
|
||||||
fn value(&self, _idx: usize) -> &() {
|
|
||||||
&()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct VoidWriter;
|
|
||||||
|
|
||||||
impl ValueWriter for VoidWriter {
|
|
||||||
type Value = ();
|
|
||||||
|
|
||||||
fn write(&mut self, _val: &()) {}
|
|
||||||
|
|
||||||
fn write_block(&mut self, _writer: &mut Vec<u8>) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct U64MonotonicWriter {
|
|
||||||
vals: Vec<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ValueWriter for U64MonotonicWriter {
|
|
||||||
type Value = u64;
|
|
||||||
|
|
||||||
fn write(&mut self, val: &Self::Value) {
|
|
||||||
self.vals.push(*val);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_block(&mut self, writer: &mut Vec<u8>) {
|
|
||||||
let mut prev_val = 0u64;
|
|
||||||
vint::serialize_into_vec(self.vals.len() as u64, writer);
|
|
||||||
for &val in &self.vals {
|
|
||||||
let delta = val - prev_val;
|
|
||||||
vint::serialize_into_vec(delta, writer);
|
|
||||||
prev_val = val;
|
|
||||||
}
|
|
||||||
self.vals.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct U64MonotonicReader {
|
|
||||||
vals: Vec<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ValueReader for U64MonotonicReader {
|
|
||||||
type Value = u64;
|
|
||||||
|
|
||||||
fn value(&self, idx: usize) -> &Self::Value {
|
|
||||||
&self.vals[idx]
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
|
|
||||||
let len = reader.deserialize_u64() as usize;
|
|
||||||
self.vals.clear();
|
|
||||||
let mut prev_val = 0u64;
|
|
||||||
for _ in 0..len {
|
|
||||||
let delta = reader.deserialize_u64() as u64;
|
|
||||||
let val = prev_val + delta;
|
|
||||||
self.vals.push(val);
|
|
||||||
prev_val = val;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,74 +0,0 @@
|
|||||||
use super::BlockReader;
|
|
||||||
|
|
||||||
const CONTINUE_BIT: u8 = 128u8;
|
|
||||||
|
|
||||||
pub fn serialize(mut val: u64, buffer: &mut [u8]) -> usize {
|
|
||||||
for (i, b) in buffer.iter_mut().enumerate() {
|
|
||||||
let next_byte: u8 = (val & 127u64) as u8;
|
|
||||||
val = val >> 7;
|
|
||||||
if val == 0u64 {
|
|
||||||
*b = next_byte;
|
|
||||||
return i + 1;
|
|
||||||
} else {
|
|
||||||
*b = next_byte | CONTINUE_BIT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
10 //< actually unreachable
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn serialize_into_vec(val: u64, buffer: &mut Vec<u8>) {
|
|
||||||
let mut buf = [0u8; 10];
|
|
||||||
let num_bytes = serialize(val, &mut buf[..]);
|
|
||||||
buffer.extend_from_slice(&buf[..num_bytes]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// super slow but we don't care
|
|
||||||
pub fn deserialize_read(buf: &[u8]) -> (usize, u64) {
|
|
||||||
let mut result = 0u64;
|
|
||||||
let mut shift = 0u64;
|
|
||||||
let mut consumed = 0;
|
|
||||||
|
|
||||||
for &b in buf {
|
|
||||||
consumed += 1;
|
|
||||||
result |= u64::from(b % 128u8) << shift;
|
|
||||||
if b < CONTINUE_BIT {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
shift += 7;
|
|
||||||
}
|
|
||||||
(consumed, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize_from_block(block: &mut BlockReader) -> u64 {
|
|
||||||
let (num_bytes, val) = deserialize_read(block.buffer());
|
|
||||||
block.advance(num_bytes);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::{deserialize_read, serialize};
|
|
||||||
use std::u64;
|
|
||||||
|
|
||||||
fn aux_test_int(val: u64, expect_len: usize) {
|
|
||||||
let mut buffer = [0u8; 14];
|
|
||||||
assert_eq!(serialize(val, &mut buffer[..]), expect_len);
|
|
||||||
assert_eq!(deserialize_read(&buffer), (expect_len, val));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_vint() {
|
|
||||||
aux_test_int(0u64, 1);
|
|
||||||
aux_test_int(17u64, 1);
|
|
||||||
aux_test_int(127u64, 1);
|
|
||||||
aux_test_int(128u64, 2);
|
|
||||||
aux_test_int(123423418u64, 4);
|
|
||||||
for i in 1..63 {
|
|
||||||
let power_of_two = 1u64 << i;
|
|
||||||
aux_test_int(power_of_two + 1, (i / 7) + 1);
|
|
||||||
aux_test_int(power_of_two, (i / 7) + 1);
|
|
||||||
aux_test_int(power_of_two - 1, ((i - 1) / 7) + 1);
|
|
||||||
}
|
|
||||||
aux_test_int(u64::MAX, 10);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,227 +0,0 @@
|
|||||||
use super::TermDictionary;
|
|
||||||
use crate::postings::TermInfo;
|
|
||||||
use crate::termdict::sstable_termdict::TermInfoReader;
|
|
||||||
use crate::termdict::TermOrdinal;
|
|
||||||
use std::io;
|
|
||||||
use std::ops::Bound;
|
|
||||||
use tantivy_fst::automaton::AlwaysMatch;
|
|
||||||
use tantivy_fst::Automaton;
|
|
||||||
|
|
||||||
/// `TermStreamerBuilder` is a helper object used to define
|
|
||||||
/// a range of terms that should be streamed.
|
|
||||||
pub struct TermStreamerBuilder<'a, A = AlwaysMatch>
|
|
||||||
where
|
|
||||||
A: Automaton,
|
|
||||||
A::State: Clone,
|
|
||||||
{
|
|
||||||
term_dict: &'a TermDictionary,
|
|
||||||
automaton: A,
|
|
||||||
lower: Bound<Vec<u8>>,
|
|
||||||
upper: Bound<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, A> TermStreamerBuilder<'a, A>
|
|
||||||
where
|
|
||||||
A: Automaton,
|
|
||||||
A::State: Clone,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(term_dict: &'a TermDictionary, automaton: A) -> Self {
|
|
||||||
TermStreamerBuilder {
|
|
||||||
term_dict,
|
|
||||||
automaton,
|
|
||||||
lower: Bound::Unbounded,
|
|
||||||
upper: Bound::Unbounded,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Limit the range to terms greater or equal to the bound
|
|
||||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
|
||||||
self.lower = Bound::Included(bound.as_ref().to_owned());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Limit the range to terms strictly greater than the bound
|
|
||||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
|
||||||
self.lower = Bound::Excluded(bound.as_ref().to_owned());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Limit the range to terms lesser or equal to the bound
|
|
||||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
|
||||||
self.upper = Bound::Included(bound.as_ref().to_owned());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Limit the range to terms lesser or equal to the bound
|
|
||||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
|
||||||
self.lower = Bound::Excluded(bound.as_ref().to_owned());
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn backward(mut self) -> Self {
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates the stream corresponding to the range
|
|
||||||
/// of terms defined using the `TermStreamerBuilder`.
|
|
||||||
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
|
|
||||||
let start_state = self.automaton.start();
|
|
||||||
let delta_reader = self.term_dict.sstable_delta_reader()?;
|
|
||||||
Ok(TermStreamer {
|
|
||||||
automaton: self.automaton,
|
|
||||||
states: vec![start_state],
|
|
||||||
delta_reader,
|
|
||||||
key: Vec::new(),
|
|
||||||
term_ord: 0u64,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
|
||||||
/// Terms are guaranteed to be sorted.
|
|
||||||
pub struct TermStreamer<'a, A = AlwaysMatch>
|
|
||||||
where
|
|
||||||
A: Automaton,
|
|
||||||
A::State: Clone,
|
|
||||||
{
|
|
||||||
automaton: A,
|
|
||||||
states: Vec<A::State>,
|
|
||||||
delta_reader: super::sstable::DeltaReader<'a, TermInfoReader>,
|
|
||||||
key: Vec<u8>,
|
|
||||||
term_ord: TermOrdinal,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, A> TermStreamer<'a, A>
|
|
||||||
where
|
|
||||||
A: Automaton,
|
|
||||||
A::State: Clone,
|
|
||||||
{
|
|
||||||
/// Advance position the stream on the next item.
|
|
||||||
/// Before the first call to `.advance()`, the stream
|
|
||||||
/// is an unitialized state.
|
|
||||||
pub fn advance(&mut self) -> bool {
|
|
||||||
while self.delta_reader.advance().unwrap() {
|
|
||||||
self.term_ord += 1u64;
|
|
||||||
let common_prefix_len = self.delta_reader.common_prefix_len();
|
|
||||||
self.states.truncate(common_prefix_len + 1);
|
|
||||||
self.key.truncate(common_prefix_len);
|
|
||||||
let mut state: A::State = self.states.last().unwrap().clone();
|
|
||||||
for &b in self.delta_reader.suffix() {
|
|
||||||
state = self.automaton.accept(&state, b);
|
|
||||||
self.states.push(state.clone());
|
|
||||||
}
|
|
||||||
self.key.extend_from_slice(self.delta_reader.suffix());
|
|
||||||
if self.automaton.is_match(&state) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the `TermOrdinal` of the given term.
|
|
||||||
///
|
|
||||||
/// May panic if the called as `.advance()` as never
|
|
||||||
/// been called before.
|
|
||||||
pub fn term_ord(&self) -> TermOrdinal {
|
|
||||||
self.term_ord
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accesses the current key.
|
|
||||||
///
|
|
||||||
/// `.key()` should return the key that was returned
|
|
||||||
/// by the `.next()` method.
|
|
||||||
///
|
|
||||||
/// If the end of the stream as been reached, and `.next()`
|
|
||||||
/// has been called and returned `None`, `.key()` remains
|
|
||||||
/// the value of the last key encountered.
|
|
||||||
///
|
|
||||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
|
||||||
pub fn key(&self) -> &[u8] {
|
|
||||||
&self.key
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Accesses the current value.
|
|
||||||
///
|
|
||||||
/// Calling `.value()` after the end of the stream will return the
|
|
||||||
/// last `.value()` encountered.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// Calling `.value()` before the first call to `.advance()` returns
|
|
||||||
/// `V::default()`.
|
|
||||||
pub fn value(&self) -> &TermInfo {
|
|
||||||
self.delta_reader.value()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the next `(key, value)` pair.
|
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(clippy::should_implement_trait))]
|
|
||||||
pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> {
|
|
||||||
if self.advance() {
|
|
||||||
Some((self.key(), self.value()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::super::TermDictionary;
|
|
||||||
use crate::directory::OwnedBytes;
|
|
||||||
use crate::postings::TermInfo;
|
|
||||||
|
|
||||||
fn make_term_info(i: u64) -> TermInfo {
|
|
||||||
TermInfo {
|
|
||||||
doc_freq: 1000u32 + i as u32,
|
|
||||||
positions_idx: i * 500,
|
|
||||||
postings_start_offset: (i + 10) * (i * 10),
|
|
||||||
postings_stop_offset: ((i + 1) + 10) * ((i + 1) * 10),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_test_term_dictionary() -> crate::Result<TermDictionary> {
|
|
||||||
let mut term_dict_builder = super::super::TermDictionaryBuilder::create(Vec::new())?;
|
|
||||||
term_dict_builder.insert(b"abaisance", &make_term_info(0u64))?;
|
|
||||||
term_dict_builder.insert(b"abalation", &make_term_info(1u64))?;
|
|
||||||
term_dict_builder.insert(b"abalienate", &make_term_info(2u64))?;
|
|
||||||
term_dict_builder.insert(b"abandon", &make_term_info(3u64))?;
|
|
||||||
let buffer = term_dict_builder.finish()?;
|
|
||||||
let owned_bytes = OwnedBytes::new(buffer);
|
|
||||||
TermDictionary::from_bytes(owned_bytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sstable_stream() -> crate::Result<()> {
|
|
||||||
let term_dict = create_test_term_dictionary()?;
|
|
||||||
let mut term_streamer = term_dict.stream()?;
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abaisance");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1000u32);
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abalation");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1001u32);
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abalienate");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1002u32);
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abandon");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1003u32);
|
|
||||||
assert!(!term_streamer.advance());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sstable_search() -> crate::Result<()> {
|
|
||||||
let term_dict = create_test_term_dictionary()?;
|
|
||||||
let ptn = tantivy_fst::Regex::new("ab.*t.*").unwrap();
|
|
||||||
let mut term_streamer = term_dict.search(ptn).into_stream()?;
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abalation");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1001u32);
|
|
||||||
assert!(term_streamer.advance());
|
|
||||||
assert_eq!(term_streamer.key(), b"abalienate");
|
|
||||||
assert_eq!(term_streamer.value().doc_freq, 1002u32);
|
|
||||||
assert!(!term_streamer.advance());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,228 +0,0 @@
|
|||||||
use std::io;
|
|
||||||
|
|
||||||
use crate::common::BinarySerializable;
|
|
||||||
use crate::directory::{FileSlice, OwnedBytes};
|
|
||||||
use crate::postings::TermInfo;
|
|
||||||
use crate::termdict::sstable_termdict::sstable::sstable_index::BlockAddr;
|
|
||||||
use crate::termdict::sstable_termdict::sstable::Writer;
|
|
||||||
use crate::termdict::sstable_termdict::sstable::{DeltaReader, SSTable};
|
|
||||||
use crate::termdict::sstable_termdict::sstable::{Reader, SSTableIndex};
|
|
||||||
use crate::termdict::sstable_termdict::{
|
|
||||||
TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder,
|
|
||||||
};
|
|
||||||
use crate::termdict::TermOrdinal;
|
|
||||||
use crate::HasLen;
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use tantivy_fst::automaton::AlwaysMatch;
|
|
||||||
use tantivy_fst::Automaton;
|
|
||||||
|
|
||||||
pub struct TermInfoSSTable;
|
|
||||||
impl SSTable for TermInfoSSTable {
|
|
||||||
type Value = TermInfo;
|
|
||||||
type Reader = TermInfoReader;
|
|
||||||
type Writer = TermInfoWriter;
|
|
||||||
}
|
|
||||||
pub struct TermDictionaryBuilder<W: io::Write> {
|
|
||||||
sstable_writer: Writer<W, TermInfoWriter>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<W: io::Write> TermDictionaryBuilder<W> {
|
|
||||||
/// Creates a new `TermDictionaryBuilder`
|
|
||||||
pub fn create(w: W) -> io::Result<Self> {
|
|
||||||
let sstable_writer = TermSSTable::writer(w);
|
|
||||||
Ok(TermDictionaryBuilder { sstable_writer })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
|
||||||
///
|
|
||||||
/// *Keys have to be inserted in order.*
|
|
||||||
pub fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
|
|
||||||
let key = key_ref.as_ref();
|
|
||||||
self.insert_key(key)?;
|
|
||||||
self.insert_value(value)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Warning
|
|
||||||
/// Horribly dangerous internal API
|
|
||||||
///
|
|
||||||
/// If used, it must be used by systematically alternating calls
|
|
||||||
/// to insert_key and insert_value.
|
|
||||||
///
|
|
||||||
/// Prefer using `.insert(key, value)`
|
|
||||||
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
|
|
||||||
self.sstable_writer.write_key(key);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Warning
|
|
||||||
///
|
|
||||||
/// Horribly dangerous internal API. See `.insert_key(...)`.
|
|
||||||
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
|
|
||||||
self.sstable_writer.write_value(term_info);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Finalize writing the builder, and returns the underlying
|
|
||||||
/// `Write` object.
|
|
||||||
pub fn finish(self) -> io::Result<W> {
|
|
||||||
self.sstable_writer.finalize()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
|
|
||||||
let term_dictionary_data: Vec<u8> = TermDictionaryBuilder::create(Vec::<u8>::new())
|
|
||||||
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
|
|
||||||
.finish()
|
|
||||||
.expect("Writing in a Vec<u8> should never fail");
|
|
||||||
FileSlice::from(term_dictionary_data)
|
|
||||||
});
|
|
||||||
|
|
||||||
/// The term dictionary contains all of the terms in
|
|
||||||
/// `tantivy index` in a sorted manner.
|
|
||||||
///
|
|
||||||
/// The `Fst` crate is used to associate terms to their
|
|
||||||
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
|
|
||||||
/// possible to fetch the associated `TermInfo`.
|
|
||||||
pub struct TermDictionary {
|
|
||||||
sstable_slice: FileSlice,
|
|
||||||
sstable_index: SSTableIndex,
|
|
||||||
num_terms: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TermDictionary {
|
|
||||||
pub(crate) fn sstable_reader(&self) -> io::Result<Reader<'static, TermInfoReader>> {
|
|
||||||
let data = self.sstable_slice.read_bytes()?;
|
|
||||||
Ok(TermInfoSSTable::reader(data))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn sstable_reader_block(
|
|
||||||
&self,
|
|
||||||
block_addr: BlockAddr,
|
|
||||||
) -> io::Result<Reader<'static, TermInfoReader>> {
|
|
||||||
let data = self.sstable_slice.read_bytes_slice(
|
|
||||||
block_addr.start_offset as usize,
|
|
||||||
block_addr.end_offset as usize,
|
|
||||||
)?;
|
|
||||||
Ok(TermInfoSSTable::reader(data))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn sstable_delta_reader(&self) -> io::Result<DeltaReader<'static, TermInfoReader>> {
|
|
||||||
let data = self.sstable_slice.read_bytes()?;
|
|
||||||
Ok(TermInfoSSTable::delta_reader(data))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Opens a `TermDictionary`.
|
|
||||||
pub fn open(term_dictionary_file: FileSlice) -> crate::Result<Self> {
|
|
||||||
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
|
|
||||||
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
|
|
||||||
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
|
|
||||||
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
|
|
||||||
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
|
|
||||||
// dbg!(index_slice.len());
|
|
||||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
|
||||||
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice());
|
|
||||||
// dbg!(&sstable_index);
|
|
||||||
Ok(TermDictionary {
|
|
||||||
sstable_slice,
|
|
||||||
sstable_index,
|
|
||||||
num_terms,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_bytes(owned_bytes: OwnedBytes) -> crate::Result<TermDictionary> {
|
|
||||||
TermDictionary::open(FileSlice::new(Box::new(owned_bytes)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates an empty term dictionary which contains no terms.
|
|
||||||
pub fn empty() -> Self {
|
|
||||||
TermDictionary::open(EMPTY_TERM_DICT_FILE.clone()).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the number of terms in the dictionary.
|
|
||||||
/// Term ordinals range from 0 to `num_terms() - 1`.
|
|
||||||
pub fn num_terms(&self) -> usize {
|
|
||||||
self.num_terms as usize
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the ordinal associated to a given term.
|
|
||||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
|
||||||
let mut term_ord = 0u64;
|
|
||||||
let key_bytes = key.as_ref();
|
|
||||||
let mut sstable_reader = self.sstable_reader()?;
|
|
||||||
while sstable_reader.advance().unwrap_or(false) {
|
|
||||||
if sstable_reader.key() == key_bytes {
|
|
||||||
return Ok(Some(term_ord));
|
|
||||||
}
|
|
||||||
term_ord += 1;
|
|
||||||
}
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the term associated to a given term ordinal.
|
|
||||||
///
|
|
||||||
/// Term ordinals are defined as the position of the term in
|
|
||||||
/// the sorted list of terms.
|
|
||||||
///
|
|
||||||
/// Returns true iff the term has been found.
|
|
||||||
///
|
|
||||||
/// Regardless of whether the term is found or not,
|
|
||||||
/// the buffer may be modified.
|
|
||||||
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
|
||||||
let mut sstable_reader = self.sstable_reader()?;
|
|
||||||
bytes.clear();
|
|
||||||
for _ in 0..(ord + 1) {
|
|
||||||
if !sstable_reader.advance().unwrap_or(false) {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bytes.extend_from_slice(sstable_reader.key());
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the number of terms in the dictionary.
|
|
||||||
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<TermInfo> {
|
|
||||||
let mut sstable_reader = self.sstable_reader()?;
|
|
||||||
for _ in 0..(term_ord + 1) {
|
|
||||||
if !sstable_reader.advance().unwrap_or(false) {
|
|
||||||
return Ok(TermInfo::default());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(sstable_reader.value().clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Lookups the value corresponding to the key.
|
|
||||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
|
|
||||||
if let Some(block_addr) = self.sstable_index.search(key.as_ref()) {
|
|
||||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
|
||||||
let key_bytes = key.as_ref();
|
|
||||||
while sstable_reader.advance().unwrap_or(false) {
|
|
||||||
if sstable_reader.key() == key_bytes {
|
|
||||||
let term_info = sstable_reader.value().clone();
|
|
||||||
return Ok(Some(term_info));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a range builder, to stream all of the terms
|
|
||||||
// within an interval.
|
|
||||||
pub fn range(&self) -> TermStreamerBuilder<'_> {
|
|
||||||
TermStreamerBuilder::new(self, AlwaysMatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
|
||||||
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
|
|
||||||
self.range().into_stream()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a search builder, to stream all of the terms
|
|
||||||
// within the Automaton
|
|
||||||
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A>
|
|
||||||
where
|
|
||||||
A::State: Clone,
|
|
||||||
{
|
|
||||||
TermStreamerBuilder::<A>::new(self, automaton)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -249,8 +249,7 @@ fn test_empty_string() -> crate::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
|
||||||
fn test_stream_range_boundaries() -> crate::Result<()> {
|
|
||||||
let buffer: Vec<u8> = {
|
let buffer: Vec<u8> = {
|
||||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||||
for i in 0u8..10u8 {
|
for i in 0u8..10u8 {
|
||||||
@@ -260,84 +259,96 @@ fn test_stream_range_boundaries() -> crate::Result<()> {
|
|||||||
term_dictionary_builder.finish()?
|
term_dictionary_builder.finish()?
|
||||||
};
|
};
|
||||||
let file = FileSlice::from(buffer);
|
let file = FileSlice::from(buffer);
|
||||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
TermDictionary::open(file)
|
||||||
|
}
|
||||||
|
|
||||||
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
|
#[test]
|
||||||
|
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
|
||||||
|
let term_dictionary = stream_range_test_dict()?;
|
||||||
|
let value_list = |mut streamer: TermStreamer<'_>| {
|
||||||
let mut res: Vec<u32> = vec![];
|
let mut res: Vec<u32> = vec![];
|
||||||
while let Some((_, ref v)) = streamer.next() {
|
while let Some((_, ref v)) = streamer.next() {
|
||||||
res.push(v.doc_freq);
|
res.push(v.doc_freq);
|
||||||
}
|
}
|
||||||
if backwards {
|
|
||||||
res.reverse();
|
|
||||||
}
|
|
||||||
res
|
res
|
||||||
};
|
};
|
||||||
{
|
|
||||||
let range = term_dictionary.range().backward().into_stream()?;
|
|
||||||
assert_eq!(
|
|
||||||
value_list(range, true),
|
|
||||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
let range = term_dictionary.range().ge([2u8]).into_stream()?;
|
let range = term_dictionary.range().ge([2u8]).into_stream()?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
value_list(range, false),
|
value_list(range),
|
||||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
|
|
||||||
assert_eq!(
|
|
||||||
value_list(range, true),
|
|
||||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let range = term_dictionary.range().gt([2u8]).into_stream()?;
|
let range = term_dictionary.range().gt([2u8]).into_stream()?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
value_list(range, false),
|
value_list(range),
|
||||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
|
|
||||||
assert_eq!(
|
|
||||||
value_list(range, true),
|
|
||||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let range = term_dictionary.range().lt([6u8]).into_stream()?;
|
let range = term_dictionary.range().lt([6u8]).into_stream()?;
|
||||||
assert_eq!(
|
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
|
||||||
value_list(range, false),
|
|
||||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
|
|
||||||
assert_eq!(
|
|
||||||
value_list(range, true),
|
|
||||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let range = term_dictionary.range().le([6u8]).into_stream()?;
|
let range = term_dictionary.range().le([6u8]).into_stream()?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
value_list(range, false),
|
value_list(range),
|
||||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
|
|
||||||
assert_eq!(
|
|
||||||
value_list(range, true),
|
|
||||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
|
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
|
||||||
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
|
||||||
|
let term_dictionary = stream_range_test_dict()?;
|
||||||
|
let value_list_backward = |mut streamer: TermStreamer<'_>| {
|
||||||
|
let mut res: Vec<u32> = vec![];
|
||||||
|
while let Some((_, ref v)) = streamer.next() {
|
||||||
|
res.push(v.doc_freq);
|
||||||
|
}
|
||||||
|
res.reverse();
|
||||||
|
res
|
||||||
|
};
|
||||||
|
{
|
||||||
|
let range = term_dictionary.range().backward().into_stream()?;
|
||||||
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
|
||||||
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
|
||||||
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
|
||||||
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
|
||||||
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let range = term_dictionary
|
let range = term_dictionary
|
||||||
@@ -346,11 +357,38 @@ fn test_stream_range_boundaries() -> crate::Result<()> {
|
|||||||
.lt([5u8])
|
.lt([5u8])
|
||||||
.backward()
|
.backward()
|
||||||
.into_stream()?;
|
.into_stream()?;
|
||||||
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
assert_eq!(
|
||||||
|
value_list_backward(range),
|
||||||
|
vec![0u32, 1u32, 2u32, 3u32, 4u32]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_ord_to_term() -> crate::Result<()> {
|
||||||
|
let termdict = stream_range_test_dict()?;
|
||||||
|
let mut bytes = vec![];
|
||||||
|
for b in 0u8..10u8 {
|
||||||
|
termdict.ord_to_term(b as u64, &mut bytes)?;
|
||||||
|
assert_eq!(&bytes, &[b]);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stream_term_ord() -> crate::Result<()> {
|
||||||
|
let termdict = stream_range_test_dict()?;
|
||||||
|
let mut stream = termdict.stream()?;
|
||||||
|
for b in 0u8..10u8 {
|
||||||
|
assert!(stream.advance(), true);
|
||||||
|
assert_eq!(stream.term_ord(), b as u64);
|
||||||
|
assert_eq!(stream.key(), &[b]);
|
||||||
|
}
|
||||||
|
assert!(!stream.advance());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_automaton_search() -> crate::Result<()> {
|
fn test_automaton_search() -> crate::Result<()> {
|
||||||
use crate::query::DFAWrapper;
|
use crate::query::DFAWrapper;
|
||||||
|
|||||||
Reference in New Issue
Block a user