Compare commits

..

20 Commits

Author SHA1 Message Date
Paul Masurel
c5d50e2138 fixing compilation 2020-12-09 17:14:41 +09:00
Paul Masurel
af2f067b69 Removed 'static in compression_lz4. 2020-12-09 16:57:01 +09:00
Paul Masurel
c056140d81 Reorganized and added termdict unit tests. 2020-12-09 16:57:01 +09:00
Paul Masurel
c6204f49d3 Minor changes
- Open{Write,Read}Error::wrap_io_error made public
- Arc<PathBuf> -> Arc<Path> in file_watcher.
2020-12-09 16:57:01 +09:00
Paul Masurel
738a1a0188 Small refactoring 2020-12-09 16:57:01 +09:00
Paul Masurel
294e8b4659 TermDictionary.finish does not flush 2020-12-09 16:57:01 +09:00
Paul Masurel
2f342257d3 Several TermDict operation now returns an io::Result 2020-12-09 16:57:01 +09:00
Paul Masurel
e9e2984ac2 Simplified counting writer and removed flush 2020-12-09 16:57:01 +09:00
Paul Masurel
e1f9271be3 Moved the term merger 2020-12-09 16:57:01 +09:00
Paul Masurel
883eb92df9 Cargo fmt 2020-12-09 16:57:01 +09:00
Paul Masurel
590654ceb8 Isolated fst impl of termdictionary in a specific module. 2020-12-09 16:57:01 +09:00
Paul Masurel
7367eb5455 DocSet is send 2020-12-09 16:55:59 +09:00
Paul Masurel
e22330c7e0 Attempt to fix bug surfacing sometimes in test.
Recently, `test_index_manual_policy_mmap` has been failing on Windows.

The idea addressed by this patch is that we forget to sync the parent
directory with the current implementation of atomic writes.
This was done correctly when we were relying the atomicwrites crate.

*crossing fingers*
2020-12-09 16:55:59 +09:00
Paul Masurel
ad4c2be21b Fix perf regression in the benchmark for the Count collector.
In order to reduce IO, we introduced a way to instanciate a dummy
constant FieldnormReader which worked by allocating a buffer with
as many bytes as there are docs in the segments.

This allocation is not a negligible by any mean.

This PR works by offering two implementation for the
FieldnormReader.
The const field norm reader simply returns the same value all of the
time, while the array based one does the same as the current one.
2020-12-09 16:55:59 +09:00
Paul Masurel
412c83c336 Added specialized implementation for count/count_including... in &mut DocSet 2020-12-09 16:55:59 +09:00
Paul Masurel
3006db17c1 Avoid computing the BM25 weight if scoring is disabled 2020-12-09 16:55:59 +09:00
Paul Masurel
b4fc185dc5 Applied CR comments 2020-12-09 16:55:59 +09:00
Adrien Guillo
20a314093f Replace some Arc<Box<dyn... with Arc<dyn... 2020-12-09 16:55:59 +09:00
Paul Masurel
9b3eb59e9b No filelen problem. 2020-12-09 16:55:59 +09:00
Adrien Guillo
6d33ae307a Add helper methods for reading u8 and u64 to OwnedBytes 2020-12-09 16:55:59 +09:00
23 changed files with 175 additions and 4186 deletions

View File

@@ -9,10 +9,6 @@ Tantivy 0.14.0
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
- Simplified the encoding of the skip reader struct. BlockWAND max tf is now encoded over a single byte. (@pmasurel)
- `FilterCollector` now supports all Fast Field value types (@barrotsteindev)
This version breaks compatibility and requires users to reindex everything.
Tantivy 0.13.2
===================

View File

@@ -47,18 +47,16 @@ murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "1"
rayon = "1"
env_logger = "0.8"
lru = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
[dev-dependencies]
rand = "0.8"
rand = "0.7"
maplit = "1"
matches = "0.1.8"
proptest = "0.10"
criterion = "0.3"
[dev-dependencies.fail]
version = "0.4"
@@ -99,7 +97,3 @@ travis-ci = { repository = "tantivy-search/tantivy" }
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false

File diff suppressed because it is too large Load Diff

View File

@@ -1,22 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::tokenizer::TokenizerManager;
const ALICE_TXT: &'static str = include_str!("alice.txt");
pub fn criterion_benchmark(c: &mut Criterion) {
let tokenizer_manager = TokenizerManager::default();
let tokenizer = tokenizer_manager.get("default").unwrap();
c.bench_function("default-tokenize-alice", |b| {
b.iter(|| {
let mut word_count = 0;
let mut token_stream = tokenizer.token_stream(ALICE_TXT);
while token_stream.advance() {
word_count += 1;
}
assert_eq!(word_count, 30_731);
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -11,9 +11,9 @@
// Importing tantivy...
use std::marker::PhantomData;
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::{FastFieldReader, FastValue};
use crate::fastfield::{FastValue, FastFieldReader};
use crate::schema::Field;
use crate::collector::{Collector, SegmentCollector};
use crate::{Score, SegmentReader, TantivyError};
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
@@ -22,20 +22,23 @@ use crate::{Score, SegmentReader, TantivyError};
/// ```rust
/// use tantivy::collector::{TopDocs, FilterCollector};
/// use tantivy::query::QueryParser;
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
/// use tantivy::schema::{Schema, FAST, TEXT};
/// use tantivy::DateTime;
/// use std::str::FromStr;
/// use tantivy::{doc, DocAddress, Index};
///
/// let mut schema_builder = Schema::builder();
/// let title = schema_builder.add_text_field("title", TEXT);
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
/// let price = schema_builder.add_u64_field("price", FAST);
/// let date = schema_builder.add_date_field("date", FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
/// assert!(index_writer.commit().is_ok());
///
/// let reader = index.reader().unwrap();
@@ -43,8 +46,8 @@ use crate::{Score, SegmentReader, TantivyError};
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
/// let filter_some_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 1);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
@@ -53,6 +56,17 @@ use crate::{Score, SegmentReader, TantivyError};
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
///
/// fn date_debug(value: DateTime) -> bool {
/// println!("date: {:?}", value);
/// assert_eq!(value, DateTime::from_str("1000-04-09T00:00:00+00:00").unwrap());
/// (value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
/// }
///
/// let filter_dates_collector = FilterCollector::new(date, &date_debug, TopDocs::with_limit(2));
/// let filtered_date_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_date_docs.len(), 5);
/// ```
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
where
@@ -111,34 +125,39 @@ where
field_entry.name()
)));
}
let requested_type = TPredicateValue::to_type();
let field_schema_type = field_entry.field_type().value_type();
if requested_type != field_schema_type {
let schema_type = TPredicateValue::to_type();
let requested_type = field_entry.field_type().value_type();
if schema_type != requested_type {
return Err(TantivyError::SchemaError(format!(
"Field {:?} is of type {:?}!={:?}",
field_entry.name(),
requested_type,
field_schema_type
schema_type,
requested_type
)));
}
let fast_field_reader = segment_reader
.fast_fields()
.typed_fast_field_reader(self.field)
.ok_or_else(|| {
TantivyError::SchemaError(format!(
"{:?} is not declared as a fast field in the schema.",
self.field
))
})?;
let err_closure = || {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
))
};
let fast_fields = segment_reader.fast_fields();
let fast_value_type = TPredicateValue::to_type();
// TODO do a runtime check of `fast_value_type` against the schema.
let fast_field_reader_opt = fast_fields.typed_fast_field_reader(self.field);
let fast_field_reader = fast_field_reader_opt
.ok_or_else(|| TantivyError::SchemaError(format!("{:?} is not declared as a fast field in the schema.", self.field)))?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector,
fast_field_reader ,
segment_collector: segment_collector,
predicate: self.predicate,
t_predicate_value: PhantomData,
})

View File

@@ -8,12 +8,12 @@ use crate::DocId;
use crate::Score;
use crate::SegmentLocalId;
use crate::collector::{FilterCollector, TopDocs};
use crate::collector::{TopDocs, FilterCollector};
use crate::query::QueryParser;
use crate::schema::{Schema, FAST, TEXT};
use crate::DateTime;
use crate::{doc, Index};
use std::str::FromStr;
use crate::{doc, Index};
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
compute_score: true,
@@ -25,6 +25,7 @@ pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
#[test]
pub fn test_filter_collector() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let price = schema_builder.add_u64_field("price", FAST);
@@ -35,7 +36,6 @@ pub fn test_filter_collector() {
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64, date => DateTime::from_str("1898-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64, date => DateTime::from_str("2020-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of Anne Frank", price => 18_240u64, date => DateTime::from_str("2019-04-20T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64, date => DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()));
index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64, date => DateTime::from_str("2018-04-09T00:00:00+00:00").unwrap()));
assert!(index_writer.commit().is_ok());
@@ -45,30 +45,27 @@ pub fn test_filter_collector() {
let query_parser = QueryParser::for_index(&index, vec![title]);
let query = query_parser.parse_query("diary").unwrap();
let filter_some_collector = FilterCollector::new(
price,
&|value: u64| value > 20_120u64,
TopDocs::with_limit(2),
);
let filter_some_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2));
let top_docs = searcher.search(&query, &filter_some_collector).unwrap();
assert_eq!(top_docs.len(), 1);
assert_eq!(top_docs[0].1, DocAddress(0, 1));
let filter_all_collector: FilterCollector<_, _, u64> =
FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
assert_eq!(filtered_top_docs.len(), 0);
fn date_filter(value: DateTime) -> bool {
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
fn date_debug(value: DateTime) -> bool {
println!("date: {:?}", value);
assert_eq!(value, DateTime::from_str("1000-04-09T00:00:00+00:00").unwrap());
(value - DateTime::from_str("2019-04-09T00:00:00+00:00").unwrap()).num_weeks() > 0
}
let filter_dates_collector = FilterCollector::new(date, &date_filter, TopDocs::with_limit(5));
let filter_dates_collector = FilterCollector::new(date, &date_debug, TopDocs::with_limit(2));
let filtered_date_docs = searcher.search(&query, &filter_dates_collector).unwrap();
assert_eq!(filtered_date_docs.len(), 2);
assert_eq!(filtered_date_docs.len(), 5);
}
/// Stores all of the doc ids.

View File

@@ -115,16 +115,11 @@ pub fn u64_to_i64(val: u64) -> i64 {
/// For simplicity, tantivy internally handles `f64` as `u64`.
/// The mapping is defined by this function.
///
/// Maps `f64` to `u64` in a monotonic manner, so that bytes lexical order is preserved.
/// Maps `f64` to `u64` so that lexical order is preserved.
///
/// This is more suited than simply casting (`val as u64`)
/// which would truncate the result
///
/// # Reference
///
/// Daniel Lemire's [blog post](https://lemire.me/blog/2020/12/14/converting-floating-point-numbers-to-integers-while-preserving-order/)
/// explains the mapping in a clear manner.
///
/// # See also
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
#[inline(always)]
@@ -153,7 +148,6 @@ pub(crate) mod test {
pub use super::minmax;
pub use super::serialize::test::fixed_size_test;
use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
use proptest::prelude::*;
use std::f64;
fn test_i64_converter_helper(val: i64) {
@@ -164,15 +158,6 @@ pub(crate) mod test {
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
}
proptest! {
#[test]
fn test_f64_converter_monotonicity_proptest((left, right) in (proptest::num::f64::NORMAL, proptest::num::f64::NORMAL)) {
let left_u64 = f64_to_u64(left);
let right_u64 = f64_to_u64(right);
assert_eq!(left_u64 < right_u64, left < right);
}
}
#[test]
fn test_i64_converter() {
assert_eq!(i64_to_u64(i64::min_value()), u64::min_value());

View File

@@ -35,18 +35,12 @@ fn load_metas(
inventory: &SegmentMetaInventory,
) -> crate::Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8(meta_data)
.map_err(|utf8_err| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file is not valid utf-8. {:?}", utf8_err)
)
})?;
let meta_string = String::from_utf8_lossy(&meta_data);
IndexMeta::deserialize(&meta_string, &inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
format!("Meta file cannot be deserialized. {:?}. content = {}", e, meta_string),
format!("Meta file cannot be deserialized. {:?}.", e),
)
})
.map_err(From::from)

View File

@@ -115,18 +115,6 @@ impl Footer {
}
Ok(())
}
VersionedFooter::V3 {
crc32: _crc,
store_compression,
} => {
if &library_version.store_compression != store_compression {
return Err(Incompatibility::CompressionMismatch {
library_compression_format: library_version.store_compression.to_string(),
index_compression_format: store_compression.to_string(),
});
}
Ok(())
}
VersionedFooter::UnknownVersion => Err(Incompatibility::IndexMismatch {
library_version: library_version.clone(),
index_version: self.version.clone(),
@@ -148,31 +136,24 @@ pub enum VersionedFooter {
crc32: CrcHashU32,
store_compression: String,
},
// Block wand max termfred on 1 byte
V3 {
crc32: CrcHashU32,
store_compression: String,
},
}
impl BinarySerializable for VersionedFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
let mut buf = Vec::new();
match self {
VersionedFooter::V3 {
VersionedFooter::V2 {
crc32,
store_compression: compression,
} => {
// Serializes a valid `VersionedFooter` or panics if the version is unknown
// [ version | crc_hash | compression_mode ]
// [ 0..4 | 4..8 | variable ]
BinarySerializable::serialize(&3u32, &mut buf)?;
BinarySerializable::serialize(&2u32, &mut buf)?;
BinarySerializable::serialize(crc32, &mut buf)?;
BinarySerializable::serialize(compression, &mut buf)?;
}
VersionedFooter::V2 { .. }
| VersionedFooter::V1 { .. }
| VersionedFooter::UnknownVersion => {
VersionedFooter::V1 { .. } | VersionedFooter::UnknownVersion => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot serialize an unknown versioned footer ",
@@ -201,7 +182,7 @@ impl BinarySerializable for VersionedFooter {
reader.read_exact(&mut buf[..])?;
let mut cursor = &buf[..];
let version = u32::deserialize(&mut cursor)?;
if version > 3 {
if version != 1 && version != 2 {
return Ok(VersionedFooter::UnknownVersion);
}
let crc32 = u32::deserialize(&mut cursor)?;
@@ -211,14 +192,9 @@ impl BinarySerializable for VersionedFooter {
crc32,
store_compression,
}
} else if version == 2 {
VersionedFooter::V2 {
crc32,
store_compression,
}
} else {
assert_eq!(version, 3);
VersionedFooter::V3 {
assert_eq!(version, 2);
VersionedFooter::V2 {
crc32,
store_compression,
}
@@ -229,7 +205,6 @@ impl BinarySerializable for VersionedFooter {
impl VersionedFooter {
pub fn crc(&self) -> Option<CrcHashU32> {
match self {
VersionedFooter::V3 { crc32, .. } => Some(*crc32),
VersionedFooter::V2 { crc32, .. } => Some(*crc32),
VersionedFooter::V1 { crc32, .. } => Some(*crc32),
VersionedFooter::UnknownVersion { .. } => None,
@@ -268,7 +243,7 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc32 = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V3 {
let footer = Footer::new(VersionedFooter::V2 {
crc32,
store_compression: crate::store::COMPRESSION.to_string(),
});
@@ -303,7 +278,7 @@ mod tests {
let footer = Footer::deserialize(&mut &vec[..]).unwrap();
assert!(matches!(
footer.versioned_footer,
VersionedFooter::V3 { store_compression, .. }
VersionedFooter::V2 { store_compression, .. }
if store_compression == crate::store::COMPRESSION
));
assert_eq!(&footer.version, crate::version());
@@ -313,7 +288,7 @@ mod tests {
fn test_serialize_deserialize_footer() {
let mut buffer = Vec::new();
let crc32 = 123456u32;
let footer: Footer = Footer::new(VersionedFooter::V3 {
let footer: Footer = Footer::new(VersionedFooter::V2 {
crc32,
store_compression: "lz4".to_string(),
});
@@ -325,7 +300,7 @@ mod tests {
#[test]
fn footer_length() {
let crc32 = 1111111u32;
let versioned_footer = VersionedFooter::V3 {
let versioned_footer = VersionedFooter::V2 {
crc32,
store_compression: "lz4".to_string(),
};
@@ -346,7 +321,7 @@ mod tests {
// versionned footer length
12 | 128,
// index format version
3,
2,
0,
0,
0,
@@ -365,7 +340,7 @@ mod tests {
let versioned_footer = VersionedFooter::deserialize(&mut cursor).unwrap();
assert!(cursor.is_empty());
let expected_crc: u32 = LittleEndian::read_u32(&v_footer_bytes[5..9]) as CrcHashU32;
let expected_versioned_footer: VersionedFooter = VersionedFooter::V3 {
let expected_versioned_footer: VersionedFooter = VersionedFooter::V2 {
crc32: expected_crc,
store_compression: "lz4".to_string(),
};

View File

@@ -1,6 +1,6 @@
use crate::common::CompositeFile;
use crate::fastfield::MultiValueIntFastFieldReader;
use crate::fastfield::{BytesFastFieldReader, FastValue};
use crate::fastfield::MultiValueIntFastFieldReader;
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage;
@@ -201,12 +201,8 @@ impl FastFieldReaders {
None
}
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(
&self,
field: Field,
) -> Option<FastFieldReader<TFastValue>> {
self.u64_lenient(field)
.map(|fast_field_reader| fast_field_reader.cast())
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(&self, field: Field) -> Option<FastFieldReader<TFastValue>> {
self.u64_lenient(field).map(|fast_field_reader| fast_field_reader.cast())
}
/// Returns the `i64` fast field reader reader associated to `field`.

View File

@@ -1,94 +1,45 @@
use crate::Index;
use crate::Searcher;
use crate::{doc, schema::*};
use rand::thread_rng;
use rand::Rng;
use std::collections::HashSet;
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
use crate::schema::*;
use crate::Index;
use crate::Searcher;
use rand::Rng;
fn check_index_content(searcher: &Searcher, vals: &HashSet<u64>) {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_store() -> crate::Result<()> {
env_logger::init();
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut doc_set: Vec<u64> = Vec::new();
let mut doc_id = 0u64;
for iteration in 0.. {
let num_docs: usize = rng.gen_range(0..4);
if doc_set.len() >= 1 {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
for _ in 0..num_docs {
doc_set.push(doc_id);
index_writer.add_document(doc!(id_field=>doc_id));
doc_id += 1;
}
index_writer.commit()?;
reader.reload()?;
let searcher = reader.searcher();
println!("#{} - {}", iteration, searcher.segment_readers().len());
check_index_content(&searcher, &doc_set)?;
}
Ok(())
}
#[test]
#[ignore]
fn test_functional_indexing() -> crate::Result<()> {
fn test_indexing() {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let index = Index::create_from_tempdir(schema).unwrap();
let reader = index.reader().unwrap();
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000)?;
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..200 {
let random_val = rng.gen_range(0..20);
let random_val = rng.gen_range(0, 20);
if random_val == 0 {
index_writer.commit()?;
index_writer.commit().expect("Commit failed");
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
reader.reload()?;
reader.reload().unwrap();
let searcher = reader.searcher();
// check that everything is correct.
check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
check_index_content(&searcher, &committed_docs);
} else {
if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val);
@@ -104,5 +55,4 @@ fn test_functional_indexing() -> crate::Result<()> {
}
}
}
Ok(())
}

View File

@@ -8,7 +8,7 @@ const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_MERGE_SIZE: usize = 8;
const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000;
/// `LogMergePolicy` tries to merge segments that have a similar number of
/// `LogMergePolicy` tries tries to merge segments that have a similar number of
/// documents.
#[derive(Debug, Clone)]
pub struct LogMergePolicy {

View File

@@ -174,7 +174,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// Index format version.
const INDEX_FORMAT_VERSION: u32 = 3;
const INDEX_FORMAT_VERSION: u32 = 2;
/// Structure version for the index.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -1,46 +1,32 @@
use std::convert::TryInto;
use crate::common::{read_u32_vint_no_advance, serialize_vint_u32, BinarySerializable};
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::BM25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
#[inline(always)]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline(always)]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline(always)]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline(always)]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
prev_doc: DocId,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer { buffer: Vec::new() }
SkipSerializer {
buffer: Vec::new(),
prev_doc: 0u32,
}
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
write_u32(last_doc, &mut self.buffer);
assert!(
last_doc > self.prev_doc,
"write_doc(...) called with non-increasing doc ids. \
Did you forget to call clear maybe?"
);
let delta_doc = last_doc - self.prev_doc;
self.prev_doc = last_doc;
delta_doc.serialize(&mut self.buffer).unwrap();
self.buffer.push(doc_num_bits);
}
@@ -49,13 +35,16 @@ impl SkipSerializer {
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
write_u32(tf_sum, &mut self.buffer);
tf_sum
.serialize(&mut self.buffer)
.expect("Should never fail");
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
self.buffer.push(fieldnorm_id);
let mut buf = [0u8; 8];
let bytes = serialize_vint_u32(term_freq, &mut buf);
self.buffer.extend_from_slice(bytes);
}
pub fn data(&self) -> &[u8] {
@@ -63,6 +52,7 @@ impl SkipSerializer {
}
pub fn clear(&mut self) {
self.prev_doc = 0u32;
self.buffer.clear();
}
}
@@ -169,13 +159,18 @@ impl SkipReader {
}
fn read_block_info(&mut self) {
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let doc_num_bits = bytes[4];
let doc_delta = {
let bytes = self.owned_read.as_slice();
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[..4]);
u32::from_le_bytes(buf)
};
self.last_doc_in_block += doc_delta as DocId;
let doc_num_bits = self.owned_read.as_slice()[4];
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.owned_read.advance(5);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits: 0,
@@ -185,10 +180,11 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqs => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[7..]);
self.owned_read.advance(7 + num_bytes);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -198,11 +194,16 @@ impl SkipReader {
};
}
IndexRecordOption::WithFreqsAndPositions => {
let bytes = self.owned_read.as_slice();
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let tf_sum = {
let mut buf = [0; 4];
buf.copy_from_slice(&bytes[6..10]);
u32::from_le_bytes(buf)
};
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
let (block_wand_term_freq, num_bytes) = read_u32_vint_no_advance(&bytes[11..]);
self.owned_read.advance(11 + num_bytes);
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
@@ -212,7 +213,6 @@ impl SkipReader {
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
@@ -274,24 +274,6 @@ mod tests {
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {

View File

@@ -302,7 +302,7 @@ mod tests {
let mut rng = rand::thread_rng();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 {
let term_freq = rng.gen_range(1..10000);
let term_freq = rng.gen_range(1, 10000);
let words: Vec<&str> = std::iter::repeat("bbbb").take(term_freq).collect();
let text = words.join(" ");
writer.add_document(doc!(text_field=>text));

View File

@@ -1,5 +1,5 @@
use crate::schema::Value;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::collections::BTreeMap;
/// Internal representation of a document used for JSON
@@ -8,5 +8,5 @@ use std::collections::BTreeMap;
/// A `NamedFieldDocument` is a simple representation of a document
/// as a `BTreeMap<String, Vec<Value>>`.
///
#[derive(Debug, Deserialize, Serialize)]
#[derive(Serialize)]
pub struct NamedFieldDocument(pub BTreeMap<String, Vec<Value>>);

View File

@@ -43,9 +43,6 @@ impl CheckpointBlock {
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
if let Some(prev_checkpoint) = self.checkpoints.last() {
assert!(checkpoint.follows(prev_checkpoint));
}
self.checkpoints.push(checkpoint);
}

View File

@@ -1,4 +1,4 @@
const CHECKPOINT_PERIOD: usize = 2;
const CHECKPOINT_PERIOD: usize = 8;
use std::fmt;
mod block;
@@ -26,13 +26,6 @@ pub struct Checkpoint {
pub end_offset: u64,
}
impl Checkpoint {
pub(crate) fn follows(&self, other: &Checkpoint) -> bool {
(self.start_doc == other.end_doc) &&
(self.start_offset == other.end_offset)
}
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@@ -46,16 +39,13 @@ impl fmt::Debug for Checkpoint {
#[cfg(test)]
mod tests {
use std::{io, iter};
use std::io;
use futures::executor::block_on;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::indexer::NoMergePolicy;
use crate::schema::{SchemaBuilder, STORED, STRING};
use crate::store::index::Checkpoint;
use crate::{DocAddress, DocId, Index, Term};
use crate::DocId;
use super::{SkipIndex, SkipIndexBuilder};
@@ -64,7 +54,7 @@ mod tests {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
@@ -82,7 +72,7 @@ mod tests {
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
@@ -131,7 +121,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
@@ -143,40 +133,6 @@ mod tests {
(doc as u64) * (doc as u64)
}
#[test]
fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let text = schema_builder.add_text_field("text", STORED | STRING);
let body = schema_builder.add_text_field("body", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz")
.take(1_000)
.collect();
for _ in 0..20 {
index_writer.add_document(doc!(body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.add_document(doc!(text=>"testb"));
for _ in 0..10 {
index_writer.add_document(doc!(text=>"testd", body=>long_text.clone()));
}
index_writer.commit()?;
index_writer.delete_term(Term::from_field_text(text, "testb"));
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 30);
for i in 0..searcher.num_docs() as u32 {
let _doc = searcher.doc(DocAddress(0u32, i))?;
}
Ok(())
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
@@ -194,7 +150,7 @@ mod tests {
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
@@ -265,7 +221,7 @@ mod tests {
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);

View File

@@ -59,24 +59,6 @@ pub struct SkipIndex {
}
impl SkipIndex {
pub fn open(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
let layer = Layer {
data: data.slice(start_offset as usize, end_offset as usize),
};
layers.push(layer);
start_offset = end_offset;
}
SkipIndex { layers }
}
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
@@ -108,3 +90,22 @@ impl SkipIndex {
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -28,20 +28,18 @@ impl LayerBuilder {
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
if let Some((start_doc, end_doc)) = self.block.doc_interval() {
self.block.doc_interval().map(|(start_doc, end_doc)| {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Some(Checkpoint {
Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
})
} else {
None
}
}
})
}
fn push(&mut self, checkpoint: Checkpoint) {
@@ -50,7 +48,7 @@ impl LayerBuilder {
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD;
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
if emit_skip_info {
self.flush_block()
} else {

View File

@@ -35,7 +35,7 @@ impl StoreReader {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
let skip_index = SkipIndex::from(index_data);
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),

View File

@@ -1,50 +0,0 @@
use std::path::Path;
use crate::HasLen;
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, RAMDirectory};
use crate::fastfield::DeleteBitSet;
use super::{StoreReader, StoreWriter};
#[test]
fn test_toto2() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
let path = Path::new("b6029ade1b954ea1acad15b432eaacb9.store");
assert!(directory.validate_checksum(path)?);
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let documents = store.documents();
// for doc in documents {
// println!("{:?}", doc);
// }
let doc= store.get(15_086)?;
Ok(())
}
#[test]
fn test_toto() -> crate::Result<()> {
let directory = ManagedDirectory::wrap(MmapDirectory::open("src/store/broken_seg")?)?;
assert!(directory.validate_checksum(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store"))?);
let store_file = directory.open_read(Path::new("e6ece22e5bca4e0dbe7ce3e4dcbd5bbf.store.patched"))?;
let store = StoreReader::open(store_file)?;
let doc= store.get(53)?;
println!("{:?}", doc);
// let documents = store.documents();
// let ram_directory = RAMDirectory::create();
// let path = Path::new("store");
// let store_wrt = ram_directory.open_write(path)?;
// let mut store_writer = StoreWriter::new(store_wrt);
// for doc in &documents {
// store_writer.store(doc)?;
// }
// store_writer.close()?;
// let store_data = ram_directory.open_read(path)?;
// let new_store = StoreReader::open(store_data)?;
// for doc in 0..59 {
// println!("{}", doc);
// let doc = new_store.get(doc)?;
// println!("{:?}", doc);
// }
Ok(())
}

View File

@@ -10,7 +10,7 @@ use crate::store::index::Checkpoint;
use crate::DocId;
use std::io::{self, Write};
const BLOCK_SIZE: usize = 30;
const BLOCK_SIZE: usize = 16_384;
/// Write tantivy's [`Store`](./index.html)
///
@@ -72,7 +72,6 @@ impl StoreWriter {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
@@ -87,17 +86,12 @@ impl StoreWriter {
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.register_checkpoint(checkpoint);
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint);
self.first_doc_in_block = checkpoint.end_doc;
self.doc = checkpoint.end_doc;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
@@ -106,13 +100,14 @@ impl StoreWriter {
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
self.offset_index_writer.insert(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}