Compare commits

...

30 Commits

Author SHA1 Message Date
Paul Masurel
08f7706973 test store 2021-01-09 10:27:03 +09:00
Paul Masurel
bf6e6e8a7c Merge pull request #972 from tantivy-search/issue/969
Issue/969
2021-01-07 22:49:31 +09:00
Paul Masurel
203b0256a3 Minor renaming 2021-01-07 22:47:57 +09:00
Paul Masurel
caf2a38b7e Closes #969.
The segment stacking optimization is not updating "first_doc_in_block".
2021-01-07 22:43:56 +09:00
Paul Masurel
96f24b078e Added failing unit test. 2021-01-07 22:43:28 +09:00
Paul Masurel
332b50a4eb Merge pull request #970 from tantivy-search/functional-test-store
Added a functional long running test to test store merging.
2021-01-07 14:27:08 +09:00
Paul Masurel
8ca0954b3b Added a functional long running test to test store merging. 2021-01-07 14:07:15 +09:00
Paul Masurel
36343e2de8 Merge pull request #968 from tantivy-search/add-bench-analyzer
added a simple bench for the default analyzer
2021-01-06 21:33:39 +09:00
Paul Masurel
2f14a892ca added a simple bench for the default analyzer 2021-01-06 19:11:26 +09:00
Paul Masurel
9c3cabce40 Updated version of the rand crate. 2021-01-06 18:09:00 +09:00
Paul Masurel
f8d71c2b10 Merge pull request #964 from mosuka/deserializable
Make NamedFieldDocument deserializable
2021-01-06 17:43:53 +09:00
Paul Masurel
394dfb24f1 Merge pull request #965 from lewisdiamond/patch-1
Fix spelling
2021-01-06 13:38:31 +09:00
Lewis Diamond
b0549a229d Fix spelling 2021-01-05 22:34:56 -05:00
Minoru Osuka
670b6eaff6 Make NamedFieldDocument deserializable 2020-12-21 16:51:31 +09:00
Paul Masurel
a4f33d3823 Added comment to f64 conversion to u64.
- Added proptest
- Added comment to Lemire blog post.
2020-12-15 13:40:31 +09:00
Paul Masurel
c7841e3da5 Merge pull request #953 from barrotsteindev/filter-collector-tpredicatevalue
Generic filter collector
2020-12-14 10:35:46 +09:00
barrotsteindev
e7b4a12bba cargo fmt 2020-12-10 14:10:55 +02:00
barrotsteindev
0aaa929d6e Merge branch 'main' into filter-collector-tpredicatevalue 2020-12-10 11:27:19 +02:00
barrotsteindev
1112797c18 added a line to CHANGELOG.md 2020-12-10 11:25:08 +02:00
barrotsteindev
920481e1c1 change unit test 2020-12-10 11:24:53 +02:00
Paul Masurel
55f7b84966 Merge pull request #952 from tantivy-search/bm25-on-onebyte
Encode blockwand on a single byte.
2020-12-10 18:09:31 +09:00
Paul Masurel
09ab4df1fe Encode blockwand on a single byte. 2020-12-10 18:08:52 +09:00
barrotsteindev
0c2cf81b37 cargo fmt 2020-12-10 11:08:35 +02:00
barrotsteindev
d864430bda final edits 2020-12-10 11:08:15 +02:00
Paul Masurel
de60540e06 fixing compilation 2020-12-10 10:36:21 +02:00
Paul Masurel
c3e311e6b8 Removed 'static in compression_lz4. 2020-12-09 15:30:52 +09:00
barrotsteindev
ac704f2f22 WIP generic filter collector 2020-12-08 14:36:52 +02:00
Paul Masurel
be626083a0 Reorganized and added termdict unit tests. 2020-12-07 12:50:36 +09:00
Paul Masurel
b68fcca1e0 Minor changes
- Open{Write,Read}Error::wrap_io_error made public
- Arc<PathBuf> -> Arc<Path> in file_watcher.
2020-12-03 23:31:50 +09:00
Paul Masurel
af6dfa1856 Small refactoring 2020-12-03 14:27:05 +09:00
37 changed files with 4401 additions and 226 deletions

View File

@@ -9,6 +9,10 @@ Tantivy 0.14.0
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
This version breaks compatibility and requires users to reindex everything.
Tantivy 0.13.2
===================

View File

@@ -47,16 +47,18 @@ murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "1"
rayon = "1"
env_logger = "0.8"
lru = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
[dev-dependencies]
rand = "0.7"
rand = "0.8"
maplit = "1"
matches = "0.1.8"
proptest = "0.10"
criterion = "0.3"
[dev-dependencies.fail]
version = "0.4"
@@ -97,3 +99,7 @@ travis-ci = { repository = "tantivy-search/tantivy" }
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false

3774
benches/alice.txt Normal file

File diff suppressed because it is too large Load Diff

22
benches/analyzer.rs Normal file
View File

@@ -0,0 +1,22 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::tokenizer::TokenizerManager;
const ALICE_TXT: &'static str = include_str!("alice.txt");
pub fn criterion_benchmark(c: &mut Criterion) {
let tokenizer_manager = TokenizerManager::default();
let tokenizer = tokenizer_manager.get("default").unwrap();
c.bench_function("default-tokenize-alice", |b| {
b.iter(|| {
let mut word_count = 0;
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
while token_stream.advance() {
word_count += 1;
}
assert_eq!(word_count, 30_731);
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -368,9 +368,12 @@ impl SegmentCollector for FacetSegmentCollector {
}
let mut facet = vec![];
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
// TODO
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
// TODO handle errors.
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
if let Ok(facet) = Facet::from_encoded(facet) {
facet_counts.insert(facet, count);
}
}
}
FacetCounts { facet_counts }
}

View File

@@ -9,8 +9,10 @@
// ---
// Importing tantivy...
use std::marker::PhantomData;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::FastFieldReader;
use crate::fastfield::{FastFieldReader, FastValue};
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
@@ -41,78 +43,104 @@ use crate::{Score, SegmentReader, TantivyError};
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value| value > 20_120u64, TopDocs::with_limit(2));
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 1);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
///
/// let filter_all_collector = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector, TPredicate>
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
where
TPredicate: 'static,
{
field: Field,
collector: TCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
}
impl<TCollector, TPredicate> FilterCollector<TCollector, TPredicate>
impl<TCollector, TPredicate, TPredicateValue: FastValue>
FilterCollector<TCollector, TPredicate, TPredicateValue>
where
TCollector: Collector + Send + Sync,
TPredicate: Fn(u64) -> bool + Send + Sync,
TPredicate: Fn(TPredicateValue) -> bool + Send + Sync,
{
/// Create a new FilterCollector.
pub fn new(
field: Field,
predicate: &'static TPredicate,
collector: TCollector,
) -> FilterCollector<TCollector, TPredicate> {
) -> FilterCollector<TCollector, TPredicate, TPredicateValue> {
FilterCollector {
field,
predicate,
collector,
t_predicate_value: PhantomData,
}
}
}
impl<TCollector, TPredicate> Collector for FilterCollector<TCollector, TPredicate>
impl<TCollector, TPredicate, TPredicateValue: FastValue> Collector
for FilterCollector<TCollector, TPredicate, TPredicateValue>
where
TCollector: Collector + Send + Sync,
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child, TPredicate>;
type Child = FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate>> {
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate, TPredicateValue>> {
let schema = segment_reader.schema();
let field_entry = schema.get_field_entry(self.field);
if !field_entry.is_fast() {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is not a fast field.",
field_entry.name()
)));
}
let requested_type = TPredicateValue::to_type();
let field_schema_type = field_entry.field_type().value_type();
if requested_type != field_schema_type {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is of type {:?}!={:?}",
field_entry.name(),
requested_type,
field_schema_type
)));
}
let fast_field_reader = segment_reader
.fast_fields()
.u64(self.field)
.typed_fast_field_reader(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
"{:?} is not declared as a fast field in the schema.",
self.field
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector,
predicate: self.predicate,
t_predicate_value: PhantomData,
})
}
@@ -128,20 +156,23 @@ where
}
}
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate>
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
where
TPredicate: 'static,
TPredicateValue: 'static + FastValue,
{
fast_field_reader: FastFieldReader<u64>,
fast_field_reader: FastFieldReader<TPredicateValue>,
segment_collector: TSegmentCollector,
predicate: &'static TPredicate,
t_predicate_value: PhantomData<TPredicateValue>,
}
impl<TSegmentCollector, TPredicate> SegmentCollector
for FilterSegmentCollector<TSegmentCollector, TPredicate>
impl<TSegmentCollector, TPredicate, TPredicateValue> SegmentCollector
for FilterSegmentCollector<TSegmentCollector, TPredicate, TPredicateValue>
where
TSegmentCollector: SegmentCollector,
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync,
TPredicateValue: 'static + FastValue,
{
type Fruit = TSegmentCollector::Fruit;

View File

@@ -8,6 +8,13 @@ use crate::DocId;
use crate::Score;
use crate::SegmentLocalId;
use crate::collector::{FilterCollector, TopDocs};
use crate::query::QueryParser;
use crate::schema::{Schema, FAST, TEXT};
use crate::DateTime;
use crate::{doc, Index};
use std::str::FromStr;
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
compute_score: true,
};
@@ -16,6 +23,54 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
compute_score: true,
};
#[test]
pub fn test_filter_collector() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let price = schema_builder.add_u64_field("price", FAST);
let date = schema_builder.add_date_field("date", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
assert!(index_writer.commit().is_ok());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title]);
let query = query_parser.parse_query("diary").unwrap();
let filter_some_collector = FilterCollector::new(
price,
&|value: u64| value > 20_120u64,
TopDocs::with_limit(2),
);
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
assert_eq!(top_docs.len(), 1);
assert_eq!(top_docs[0].1, DocAddress(0, 1));
let filter_all_collector: FilterCollector<_, _, u64> =
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
assert_eq!(filtered_top_docs.len(), 0);
fn date_filter(value: DateTime) -> bool {
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
}
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
assert_eq!(filtered_date_docs.len(), 2);
}
/// Stores all of the doc ids.
/// This collector is only used for tests.
/// It is unusable in pr

View File

@@ -728,7 +728,7 @@ mod tests {
}
#[test]
fn test_top_collector_not_at_capacity() {
fn test_top_collector_not_at_capacity_without_offset() {
let index = make_index();
let field = index.schema().get_field("text").unwrap();
let query_parser = QueryParser::for_index(&index, vec![field]);

View File

@@ -115,11 +115,16 @@ pub fn u64_to_i64(val: u64) -> i64 {
/// For simplicity, tantivy internally handles `f64` as `u64`.
/// The mapping is defined by this function.
///
/// Maps `f64` to `u64` so that lexical order is preserved.
/// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
///
/// This is more suited than simply casting (`val as u64`)
/// which would truncate the result
///
/// # Reference
///
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
/// explains the mapping in a clear manner.
///
/// # See also
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
#[inline(always)]
@@ -148,6 +153,7 @@ pub(crate) mod test {
pub use super::minmax;
pub use super::serialize::test::fixed_size_test;
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use proptest::prelude::*;
use std::f64;
fn test_i64_converter_helper(val: i64) {
@@ -158,6 +164,15 @@ pub(crate) mod test {
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
}
proptest! {
#[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
let left_u64 = f64_to_u64(left);
let right_u64 = f64_to_u64(right);
assert_eq!(left_u64 < right_u64, left < right);
}
}
#[test]
fn test_i64_converter() {
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());

View File

@@ -35,12 +35,18 @@ fn load_metas(
inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8_lossy(&meta_data);
let meta_string = String::from_utf8(meta_data)
.map_err(|utf8_err| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file is not valid utf-8. {:?}", utf8_err)
)
})?;
IndexMeta::deserialize(&meta_string, &inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {:?}.", e),
format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string),
)
})
.map_err(From::from)
@@ -511,28 +517,28 @@ mod tests {
}
#[test]
fn test_index_manual_policy_mmap() {
fn test_index_manual_policy_mmap() -> crate::Result<()> {
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let mut index = Index::create_from_tempdir(schema).unwrap();
let mut writer = index.writer_for_tests().unwrap();
writer.commit().unwrap();
let mut index = Index::create_from_tempdir(schema)?;
let mut writer = index.writer_for_tests()?;
writer.commit()?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
.try_into()?;
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
writer.commit()?;
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0);
reader.reload().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().num_docs(), 1);
Ok(())
}
#[test]

View File

@@ -58,7 +58,8 @@ pub enum OpenWriteError {
}
impl OpenWriteError {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
/// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath }
}
}
@@ -143,7 +144,8 @@ pub enum OpenReadError {
}
impl OpenReadError {
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
/// Wraps an io error.
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
Self::IOError { io_error, filepath }
}
}

View File

@@ -3,7 +3,7 @@ use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
@@ -13,15 +13,15 @@ pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 }
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<PathBuf>,
path: Arc<Path>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
pub fn new(path: &Path) -> FileWatcher {
FileWatcher {
path: Arc::new(path.clone()),
path: Arc::from(path),
callbacks: Default::default(),
state: Default::default(),
}
@@ -63,7 +63,7 @@ impl FileWatcher {
handle
}
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {

View File

@@ -115,6 +115,18 @@ impl Footer {
}
Ok(())
}
VersionedFooter::V3 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
library_version: library_version.clone(),
index_version: self.version.clone(),
@@ -136,24 +148,31 @@ pub enum VersionedFooter {
crc32: CrcHashU32,
store_compression: String,
},
// Block wand max termfred on 1 byte
V3 {
crc32: CrcHashU32,
store_compression: String,
},
}
impl BinarySerializable for VersionedFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
match self {
VersionedFooter::V2 {
VersionedFooter::V3 {
crc32,
store_compression: compression,
} => {
// Serializes a valid `VersionedFooter` or panics if the version is unknown
// [ version | crc_hash | compression_mode ]
// [ 0..4 | 4..8 | variable ]
BinarySerializable::serialize(&2u32, &mut buf)?;
BinarySerializable::serialize(&3u32, &mut buf)?;
BinarySerializable::serialize(crc32, &mut buf)?;
BinarySerializable::serialize(compression, &mut buf)?;
}
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => {
VersionedFooter::V2 { .. }
| VersionedFooter::V1 { .. }
| VersionedFooter::UnknownVersion => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot serialize an unknown versioned footer ",
@@ -182,7 +201,7 @@ impl BinarySerializable for VersionedFooter {
reader.read_exact(&mut buf[..])?;
let mut cursor = &buf[..];
let version = u32::deserialize(&mut cursor)?;
if version != 1 && version != 2 {
if version > 3 {
return Ok(VersionedFooter::UnknownVersion);
}
let crc32 = u32::deserialize(&mut cursor)?;
@@ -192,12 +211,17 @@ impl BinarySerializable for VersionedFooter {
crc32,
store_compression,
}
} else {
assert_eq!(version, 2);
} else if version == 2 {
VersionedFooter::V2 {
crc32,
store_compression,
}
} else {
assert_eq!(version, 3);
VersionedFooter::V3 {
crc32,
store_compression,
}
})
}
}
@@ -205,6 +229,7 @@ impl BinarySerializable for VersionedFooter {
impl VersionedFooter {
pub fn crc(&self) -> Option<CrcHashU32> {
match self {
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
VersionedFooter::UnknownVersion { .. } => None,
@@ -243,7 +268,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc32 = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V2 {
let footer = Footer::new(VersionedFooter::V3 {
crc32,
store_compression: crate::store::COMPRESSION.to_string(),
});
@@ -278,7 +303,7 @@ mod tests {
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
assert!(matches!(
footer.versioned_footer,
VersionedFooter::V2 { store_compression, .. }
VersionedFooter::V3 { store_compression, .. }
if store_compression == crate::store::COMPRESSION
));
assert_eq!(&footer.version, crate::version());
@@ -288,7 +313,7 @@ mod tests {
fn test_serialize_deserialize_footer() {
let mut buffer = Vec::new();
let crc32 = 123456u32;
let footer: Footer = Footer::new(VersionedFooter::V2 {
let footer: Footer = Footer::new(VersionedFooter::V3 {
crc32,
store_compression: "lz4".to_string(),
});
@@ -300,7 +325,7 @@ mod tests {
#[test]
fn footer_length() {
let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V2 {
let versioned_footer = VersionedFooter::V3 {
crc32,
store_compression: "lz4".to_string(),
};
@@ -321,7 +346,7 @@ mod tests {
// versionned footer length
12 | 128,
// index format version
2,
3,
0,
0,
0,
@@ -340,7 +365,7 @@ mod tests {
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
assert!(cursor.is_empty());
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 {
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
crc32: expected_crc,
store_compression: "lz4".to_string(),
};

View File

@@ -1,4 +1,5 @@
use super::MultiValueIntFastFieldReader;
use crate::error::DataCorruption;
use crate::schema::Facet;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
@@ -62,12 +63,13 @@ impl FacetReader {
&mut self,
facet_ord: TermOrdinal,
output: &mut Facet,
) -> Result<(), str::Utf8Error> {
) -> crate::Result<()> {
let found_term = self
.term_dict
.ord_to_term(facet_ord as u64, &mut self.buffer);
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
assert!(found_term, "Term ordinal {} no found.", facet_ord);
let facet_str = str::from_utf8(&self.buffer[..])?;
let facet_str = str::from_utf8(&self.buffer[..])
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
output.set_facet_str(facet_str);
Ok(())
}

View File

@@ -51,6 +51,15 @@ impl<Item: FastValue> FastFieldReader<Item> {
}
}
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
FastFieldReader {
bit_unpacker: self.bit_unpacker,
min_value_u64: self.min_value_u64,
max_value_u64: self.max_value_u64,
_phantom: PhantomData,
}
}
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.

View File

@@ -1,6 +1,6 @@
use crate::common::CompositeFile;
use crate::fastfield::BytesFastFieldReader;
use crate::fastfield::MultiValueIntFastFieldReader;
use crate::fastfield::{BytesFastFieldReader, FastValue};
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage;
@@ -201,6 +201,14 @@ impl FastFieldReaders {
None
}
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
&self,
field: Field,
) -> Option<FastFieldReader<TFastValue>> {
self.u64_lenient(field)
.map(|fast_field_reader| fast_field_reader.cast())
}
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns `None`.

View File

@@ -1,45 +1,94 @@
use rand::thread_rng;
use std::collections::HashSet;
use crate::schema::*;
use crate::Index;
use crate::Searcher;
use crate::{doc, schema::*};
use rand::thread_rng;
use rand::Rng;
use std::collections::HashSet;
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
}
Ok(())
}
#[test]
#[ignore]
fn test_indexing() {
fn test_functional_store() -> crate::Result<()> {
env_logger::init();
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64;
for iteration in 0.. {
let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
for _ in 0..num_docs {
doc_set.push(doc_id);
index_writer.add_document(doc!(id_field=>doc_id));
doc_id += 1;
}
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
println!("#{} - {}", iteration, searcher.segment_readers().len());
check_index_content(&searcher, &doc_set)?;
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_indexing() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).unwrap();
let reader = index.reader().unwrap();
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..200 {
let random_val = rng.gen_range(0, 20);
let random_val = rng.gen_range(0..20);
if random_val == 0 {
index_writer.commit().expect("Commit failed");
index_writer.commit()?;
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
reader.reload().unwrap();
reader.reload()?;
let searcher = reader.searcher();
// check that everything is correct.
check_index_content(&searcher, &committed_docs);
check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
} else {
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val);
@@ -55,4 +104,5 @@ fn test_indexing() {
}
}
}
Ok(())
}

View File

@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
/// `LogMergePolicy` tries tries to merge segments that have a similar number of
/// `LogMergePolicy` tries to merge segments that have a similar number of
/// documents.
#[derive(Debug, Clone)]
pub struct LogMergePolicy {

View File

@@ -503,7 +503,6 @@ impl IndexMerger {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let mut field_term_streams = Vec::new();
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
let field_readers: Vec<Arc<InvertedIndexReader>> = self
@@ -512,6 +511,7 @@ impl IndexMerger {
.map(|reader| reader.inverted_index(indexed_field))
.collect::<crate::Result<Vec<_>>>()?;
let mut field_term_streams = Vec::new();
for field_reader in &field_readers {
let terms = field_reader.terms();
field_term_streams.push(terms.stream()?);

View File

@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// Index format version.
const INDEX_FORMAT_VERSION: u32 = 2;
const INDEX_FORMAT_VERSION: u32 = 3;
/// Structure version for the index.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -8,7 +8,7 @@ use std::io::{self, Write};
pub struct PositionSerializer<W: io::Write> {
bit_packer: BitPacker4x,
write_stream: CountingWriter<W>,
write_skiplist: W,
write_skip_index: W,
block: Vec<u32>,
buffer: Vec<u8>,
num_ints: u64,
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
}
impl<W: io::Write> PositionSerializer<W> {
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
PositionSerializer {
bit_packer: BitPacker4x::new(),
write_stream: CountingWriter::wrap(write_stream),
write_skiplist,
write_skip_index,
block: Vec::with_capacity(128),
buffer: vec![0u8; 128 * 4],
num_ints: 0u64,
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
fn flush_block(&mut self) -> io::Result<()> {
let num_bits = self.bit_packer.num_bits(&self.block[..]);
self.write_skiplist.write_all(&[num_bits])?;
self.write_skip_index.write_all(&[num_bits])?;
let written_len = self
.bit_packer
.compress(&self.block[..], &mut self.buffer, num_bits);
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
self.flush_block()?;
}
for &long_skip in &self.long_skips {
long_skip.serialize(&mut self.write_skiplist)?;
long_skip.serialize(&mut self.write_skip_index)?;
}
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
self.write_skiplist.flush()?;
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
self.write_skip_index.flush()?;
self.write_stream.flush()?;
Ok(())
}

View File

@@ -54,7 +54,7 @@ pub mod tests {
use crate::DocId;
use crate::HasLen;
use crate::Score;
use std::iter;
use std::{iter, mem};
#[test]
pub fn test_position_write() -> crate::Result<()> {
@@ -71,6 +71,7 @@ pub mod tests {
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
}
field_serializer.close_term()?;
mem::drop(field_serializer);
posting_serializer.close()?;
let read = segment.open_read(SegmentComponent::POSITIONS)?;
assert!(read.len() <= 140);
@@ -179,7 +180,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert_eq!(&bytes, b"hello");
}
{
@@ -191,7 +192,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert_eq!(&bytes[..], ok_token_text.as_bytes());
}
Ok(())

View File

@@ -1,32 +1,46 @@
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable};
use std::convert::TryInto;
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::BM25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
#[inline(always)]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline(always)]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline(always)]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline(always)]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
prev_doc: DocId,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer {
buffer: Vec::new(),
prev_doc: 0u32,
}
SkipSerializer { buffer: Vec::new() }
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
assert!(
last_doc > self.prev_doc,
"write_doc(...) called with non-increasing doc ids. \
Did you forget to call clear maybe?"
);
let delta_doc = last_doc - self.prev_doc;
self.prev_doc = last_doc;
delta_doc.serialize(&mut self.buffer).unwrap();
write_u32(last_doc, &mut self.buffer);
self.buffer.push(doc_num_bits);
}
@@ -35,16 +49,13 @@ impl SkipSerializer {
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
tf_sum
.serialize(&mut self.buffer)
.expect("Should never fail");
write_u32(tf_sum, &mut self.buffer);
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
self.buffer.push(fieldnorm_id);
let mut buf = [0u8; 8];
let bytes = serialize_vint_u32(term_freq, &mut buf);
self.buffer.extend_from_slice(bytes);
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
}
pub fn data(&self) -> &[u8] {
@@ -52,7 +63,6 @@ impl SkipSerializer {
}
pub fn clear(&mut self) {
self.prev_doc = 0u32;
self.buffer.clear();
}
}
@@ -159,18 +169,13 @@ impl SkipReader {
}
fn read_block_info(&mut self) {
let doc_delta = {
let bytes = self.owned_read.as_slice();
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[..4]);
u32::from_le_bytes(buf)
};
self.last_doc_in_block += doc_delta as DocId;
let doc_num_bits = self.owned_read.as_slice()[4];
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let doc_num_bits = bytes[4];
match self.skip_info {
IndexRecordOption::Basic => {
self.owned_read.advance(5);
advance_len = 5;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits: 0,
@@ -180,11 +185,10 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqs => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]);
self.owned_read.advance(7 + num_bytes);
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -194,16 +198,11 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqsAndPositions => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let tf_sum = {
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[6..10]);
u32::from_le_bytes(buf)
};
let tf_sum = read_u32(&bytes[6..10]);
let block_wand_fieldnorm_id = bytes[10];
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]);
self.owned_read.advance(11 + num_bytes);
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -213,6 +212,7 @@ impl SkipReader {
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
@@ -274,6 +274,24 @@ mod tests {
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {

View File

@@ -20,6 +20,7 @@ pub struct AutomatonWeight<A> {
impl<A> AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
/// Create a new AutomationWeight
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
@@ -42,6 +43,7 @@ where
impl<A> Weight for AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let max_doc = reader.max_doc();
@@ -102,6 +104,7 @@ mod tests {
index
}
#[derive(Clone, Copy)]
enum State {
Start,
NotMatching,

View File

@@ -302,7 +302,7 @@ mod tests {
let mut rng = rand::thread_rng();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 {
let term_freq = rng.gen_range(1, 10000);
let term_freq = rng.gen_range(1..10000);
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
let text = words.join(" ");
writer.add_document(doc!(text_field=>text));

View File

@@ -233,6 +233,7 @@ mod tests {
assert_eq!(Facet::root(), Facet::from("/"));
assert_eq!(format!("{}", Facet::root()), "/");
assert!(Facet::root().is_root());
assert_eq!(Facet::root().encoded_str(), "");
}
#[test]

View File

@@ -1,5 +1,5 @@
use crate::schema::Value;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// Internal representation of a document used for JSON
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
/// A `NamedFieldDocument` is a simple representation of a document
/// as a `BTreeMap<String, Vec<Value>>`.
///
#[derive(Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);

View File

@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
/// Name of the compression scheme used in the doc store.
///
/// This name is appended to the version string of tantivy.
pub const COMPRESSION: &'static str = "lz4";
pub const COMPRESSION: &str = "lz4";
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();

View File

@@ -43,6 +43,9 @@ impl CheckpointBlock {
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}

View File

@@ -1,4 +1,4 @@
const CHECKPOINT_PERIOD: usize = 8;
const CHECKPOINT_PERIOD: usize = 2;
use std::fmt;
mod block;
@@ -26,6 +26,13 @@ pub struct Checkpoint {
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) &&
(self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@@ -39,13 +46,16 @@ impl fmt::Debug for Checkpoint {
#[cfg(test)]
mod tests {
use std::io;
use std::{io, iter};
use futures::executor::block_on;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::indexer::NoMergePolicy;
use crate::schema::{SchemaBuilder, STORED, STRING};
use crate::store::index::Checkpoint;
use crate::DocId;
use crate::{DocAddress, DocId, Index, Term};
use super::{SkipIndex, SkipIndexBuilder};
@@ -54,7 +64,7 @@ mod tests {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
@@ -72,7 +82,7 @@ mod tests {
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
@@ -121,7 +131,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
@@ -133,6 +143,40 @@ mod tests {
(doc as u64) * (doc as u64)
}
#[test]
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let text = schema_builder.add_text_field("text", STORED | STRING);
let body = schema_builder.add_text_field("body", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
.take(1_000)
.collect();
for _ in 0..20 {
index_writer.add_document(doc!(body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.add_document(doc!(text=>"testb"));
for _ in 0..10 {
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.delete_term(Term::from_field_text(text, "testb"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 30);
for i in 0..searcher.num_docs() as u32 {
let _doc = searcher.doc(DocAddress(0u32, i))?;
}
Ok(())
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
@@ -150,7 +194,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
@@ -221,7 +265,7 @@ mod tests {
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);

View File

@@ -59,6 +59,24 @@ pub struct SkipIndex {
}
impl SkipIndex {
pub fn open(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
let layer = Layer {
data: data.slice(start_offset as usize, end_offset as usize),
};
layers.push(layer);
start_offset = end_offset;
}
SkipIndex { layers }
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
@@ -90,22 +108,3 @@ impl SkipIndex {
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -28,18 +28,20 @@ impl LayerBuilder {
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
Some(Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
})
} else {
None
}
}
fn push(&mut self, checkpoint: Checkpoint) {
@@ -48,7 +50,7 @@ impl LayerBuilder {
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
if emit_skip_info {
self.flush_block()
} else {

View File

@@ -35,7 +35,7 @@ impl StoreReader {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::from(index_data);
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),

50
src/store/tests_store.rs Normal file
View File

@@ -0,0 +1,50 @@
use std::path::Path;
use crate::HasLen;
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
use crate::fastfield::DeleteBitSet;
use super::{StoreReader, StoreWriter};
#[test]
fn test_toto2() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
assert!(directory.validate_checksum(path)?);
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let documents = store.documents();
// for doc in documents {
// println!("{:?}", doc);
// }
let doc= store.get(15_086)?;
Ok(())
}
#[test]
fn test_toto() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
let store = StoreReader::open(store_file)?;
let doc= store.get(53)?;
println!("{:?}", doc);
// let documents = store.documents();
// let ram_directory = RAMDirectory::create();
// let path = Path::new("store");
// let store_wrt = ram_directory.open_write(path)?;
// let mut store_writer = StoreWriter::new(store_wrt);
// for doc in &documents {
// store_writer.store(doc)?;
// }
// store_writer.close()?;
// let store_data = ram_directory.open_read(path)?;
// let new_store = StoreReader::open(store_data)?;
// for doc in 0..59 {
// println!("{}", doc);
// let doc = new_store.get(doc)?;
// println!("{:?}", doc);
// }
Ok(())
}

View File

@@ -10,7 +10,7 @@ use crate::store::index::Checkpoint;
use crate::DocId;
use std::io::{self, Write};
const BLOCK_SIZE: usize = 16_384;
const BLOCK_SIZE: usize = 30;
/// Write tantivy's [`Store`](./index.html)
///
@@ -72,6 +72,7 @@ impl StoreWriter {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
@@ -86,12 +87,17 @@ impl StoreWriter {
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
self.register_checkpoint(checkpoint);
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
@@ -100,14 +106,13 @@ impl StoreWriter {
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
self.register_checkpoint(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}

View File

@@ -80,7 +80,6 @@ where
.serialize(&mut counting_writer)?;
let footer_size = counting_writer.written_bytes();
(footer_size as u64).serialize(&mut counting_writer)?;
counting_writer.flush()?;
}
Ok(file)
}
@@ -152,7 +151,7 @@ impl TermDictionary {
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
bytes.clear();
let fst = self.fst_index.as_fst();
let mut node = fst.root();
@@ -167,10 +166,10 @@ impl TermDictionary {
let new_node_addr = transition.addr;
node = fst.node(new_node_addr);
} else {
return false;
return Ok(false);
}
}
true
Ok(true)
}
/// Returns the number of terms in the dictionary.

View File

@@ -50,7 +50,7 @@ fn test_term_ordinals() -> crate::Result<()> {
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
assert_eq!(bytes, term.as_bytes());
}
Ok(())
@@ -249,8 +249,7 @@ fn test_empty_string() -> crate::Result<()> {
Ok(())
}
#[test]
fn test_stream_range_boundaries() -> crate::Result<()> {
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
@@ -260,84 +259,96 @@ fn test_stream_range_boundaries() -> crate::Result<()> {
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
TermDictionary::open(file)
}
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
#[test]
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
if backwards {
res.reverse();
}
res
};
{
let range = term_dictionary.range().backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
value_list(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
value_list(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
}
{
let range = term_dictionary.range().le([6u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
value_list(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
let term_dictionary = stream_range_test_dict()?;
let value_list_backward = |mut streamer: TermStreamer<'_>| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
res.reverse();
res
};
{
let range = term_dictionary.range().backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary
@@ -346,11 +357,38 @@ fn test_stream_range_boundaries() -> crate::Result<()> {
.lt([5u8])
.backward()
.into_stream()?;
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
assert_eq!(
value_list_backward(range),
vec![0u32, 1u32, 2u32, 3u32, 4u32]
);
}
Ok(())
}
#[test]
fn test_ord_to_term() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut bytes = vec![];
for b in 0u8..10u8 {
termdict.ord_to_term(b as u64, &mut bytes)?;
assert_eq!(&bytes, &[b]);
}
Ok(())
}
#[test]
fn test_stream_term_ord() -> crate::Result<()> {
let termdict = stream_range_test_dict()?;
let mut stream = termdict.stream()?;
for b in 0u8..10u8 {
assert!(stream.advance(), true);
assert_eq!(stream.term_ord(), b as u64);
assert_eq!(stream.key(), &[b]);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;