Compare commits

...

22 Commits

Author SHA1 Message Date
Paul Masurel
a7c579f5c9 Added method to convert named doc to doc 2019-08-06 08:00:32 +09:00
Paul Masurel
f2e546bdff Changes required for python binding 2019-08-01 17:23:49 +09:00
Paul Masurel
efd1af1325 Closes #544. (#607)
Prepare for release 0.10.1
2019-07-30 13:38:06 +09:00
fdb-hiroshima
c91eb7fba7 add to_path for Facet (#604)
fix #580
2019-07-27 17:58:43 +09:00
fdb-hiroshima
6eb4e08636 add support for float (#603)
* add basic support for float

as for i64, they are mapped to u64 for indexing
query parser don't work yet

* Update value.rs

* implement support for float in query parser

* Update README.md
2019-07-27 17:57:33 +09:00
Paul Masurel
c3231ca252 Added phrase query tests (#601) 2019-07-22 13:43:00 +09:00
Paul Masurel
7211df6719 Failrs (#600)
* Single thread tests

* Isolating fail tests into a different binary
2019-07-22 13:17:21 +09:00
Paul Masurel
f27ce6412c Made the SegmentMeta inventory out of static. 2019-07-21 10:38:00 +09:00
Paul Masurel
8197a9921f Small code cleaning 2019-07-20 07:10:12 +09:00
Paul Masurel
b0e23b5715 Merge branch 'master' of github.com:tantivy-search/tantivy 2019-07-18 10:16:49 +09:00
Paul Masurel
0167151f5b Disabling generating docs 2019-07-18 10:16:29 +09:00
Paul Masurel
0668949390 Disabling generating docs 2019-07-18 09:36:57 +09:00
Paul Masurel
94d0e52786 Using instead of u64. 2019-07-17 22:02:47 +09:00
Paul Masurel
818a0abbee Small refactoring 2019-07-17 21:55:59 +09:00
Luca Bruno
4e6dcf3cbe cargo: update to fail 0.3 (#593)
* cargo: update to fail 0.3

* tantivy: align failpoints feature naming

This aligns feature naming to use `failpoints` everywhere, like the
underlying library.
2019-07-17 18:51:38 +09:00
Paul Masurel
af7ea1422a using smallvec for operation batches (#599) 2019-07-17 13:20:02 +09:00
Paul Masurel
498057c5b7 Refactor deletes (#597)
* Refactor deletes

* Removing generation from SegmentUpdater. These have been obsolete for a long time

* Number literal clippy

* Removed clippy useless allow statement
2019-07-17 13:06:44 +09:00
Paul Masurel
5095e6b010 Introduce a small refactoring of the sgment writer. (#596) 2019-07-17 08:32:29 +09:00
Paul Masurel
1aebc87ee3 disabling caching (#595) 2019-07-16 19:05:22 +09:00
Paul Masurel
9fb5058b29 Fixed links (#592)
Closes #591
2019-07-15 19:35:44 +09:00
Paul Masurel
158e0a28ba Removed ilnk to master reference doc 2019-07-15 15:18:53 +09:00
Paul Masurel
3576a006f7 Updated example link 2019-07-15 15:17:53 +09:00
64 changed files with 1732 additions and 760 deletions

View File

@@ -38,9 +38,8 @@ matrix:
# Linux
#- env: TARGET=aarch64-unknown-linux-gnu
#- env: TARGET=i686-unknown-linux-gnu
- env: TARGET=x86_64-unknown-linux-gnu CODECOV=1 UPLOAD_DOCS=1
- env: TARGET=x86_64-unknown-linux-gnu CODECOV=1 #UPLOAD_DOCS=1
# - env: TARGET=x86_64-unknown-linux-musl CODECOV=1
# OSX
#- env: TARGET=x86_64-apple-darwin
# os: osx
@@ -70,15 +69,15 @@ after_success:
- if [[ -v GH_TOKEN ]]; then echo "GH TOKEN IS SET"; else echo "GH TOKEN NOT SET"; fi
- if [[ -v UPLOAD_DOCS ]]; then cargo doc; cargo doc-upload; else echo "doc upload disabled."; fi
cache: cargo
before_cache:
# Travis can't cache files that are not readable by "others"
- chmod -R a+r $HOME/.cargo
- find ./target/debug -type f -maxdepth 1 -delete
- rm -f ./target/.rustc_info.json
- rm -fr ./target/debug/{deps,.fingerprint}/tantivy*
- rm -r target/debug/examples/
- ls -1 examples/ | sed -e 's/\.rs$//' | xargs -I "{}" find target/* -name "*{}*" -type f -delete
#cache: cargo
#before_cache:
# # Travis can't cache files that are not readable by "others"
# - chmod -R a+r $HOME/.cargo
# - find ./target/debug -type f -maxdepth 1 -delete
# - rm -f ./target/.rustc_info.json
# - rm -fr ./target/debug/{deps,.fingerprint}/tantivy*
# - rm -r target/debug/examples/
# - ls -1 examples/ | sed -e 's/\.rs$//' | xargs -I "{}" find target/* -name "*{}*" -type f -delete
#branches:
# only:

View File

@@ -1,3 +1,16 @@
Tantivy 0.11.0
=====================
- Added f64 field. Internally reuse u64 code the same way i64 does (@fdb-hiroshima)
Tantivy 0.10.1
=====================
- Closes #544. A few users experienced problems with the directory watching system.
Avoid watching the mmap directory until someone effectively creates a reader that uses
this functionality.
Tantivy 0.10.0
=====================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.10.0"
version = "0.10.1"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -49,10 +49,11 @@ fnv = "1.0.6"
owned-read = "0.4"
failure = "0.1"
htmlescape = "0.3.1"
fail = "0.2"
fail = "0.3"
scoped-pool = "1.0"
murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
@@ -73,13 +74,28 @@ debug-assertions = true
overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
default = ["mmap"]
mmap = ["atomicwrites", "fs2", "memmap", "notify"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
wasm-bindgen = ["uuid/wasm-bindgen"]
[badges]
travis-ci = { repository = "tantivy-search/tantivy" }
[dev-dependencies.fail]
features = ["failpoints"]
# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
# in a different binary.
#
# We do that because, fail rely on a global definition of
# failpoints behavior and hence, it is incompatible with
# multithreading.
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]

View File

@@ -50,9 +50,9 @@ performance for different type of queries / collection.
- Multithreaded indexing (indexing English Wikipedia takes < 3 minutes on my desktop)
- Mmap directory
- SIMD integer compression when the platform/CPU includes the SSE2 instruction set.
- Single valued and multivalued u64 and i64 fast fields (equivalent of doc values in Lucene)
- Single valued and multivalued u64, i64 and f64 fast fields (equivalent of doc values in Lucene)
- `&[u8]` fast fields
- Text, i64, u64, dates and hierarchical facet fields
- Text, i64, u64, f64, dates and hierarchical facet fields
- LZ4 compressed document store
- Range queries
- Faceted search
@@ -71,14 +71,12 @@ Tantivy works on stable rust (>= 1.27) and supports Linux, MacOS and Windows.
# Getting started
- [tantivy's simple search example](http://fulmicoton.com/tantivy-examples/simple_search.html)
- [tantivy's simple search example](https://tantivy-search.github.io/examples/basic_search.html)
- [tantivy-cli and its tutorial](https://github.com/tantivy-search/tantivy-cli).
`tantivy-cli` is an actual command line interface that makes it easy for you to create a search engine,
index documents and search via the CLI or a small server with a REST API.
It will walk you through getting a wikipedia search engine up and running in a few minutes.
- [reference doc]
- [For the last released version](https://docs.rs/tantivy/)
- [For the last master branch](https://tantivy-search.github.io/tantivy/tantivy/index.html)
- [reference doc for the last released version](https://docs.rs/tantivy/)
# How can I support this project?

View File

@@ -18,5 +18,5 @@ install:
build: false
test_script:
- REM SET RUST_LOG=tantivy,test & cargo test --verbose --no-default-features --features mmap -- --test-threads 1
- REM SET RUST_LOG=tantivy,test & cargo test --verbose --no-default-features --features mmap
- REM SET RUST_BACKTRACE=1 & cargo build --examples

View File

@@ -1,2 +1,2 @@
#!/bin/bash
cargo test --no-default-features --features mmap -- --test-threads 1
cargo test

View File

@@ -82,6 +82,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let num_field_i64 = schema_builder.add_i64_field("num_i64", FAST);
let num_field_u64 = schema_builder.add_u64_field("num_u64", FAST);
let num_field_f64 = schema_builder.add_f64_field("num_f64", FAST);
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
@@ -94,6 +95,7 @@ mod tests {
index_writer.add_document(doc!(
num_field_i64 => ((i as i64) % 3i64) as i64,
num_field_u64 => (i % 2u64) as u64,
num_field_f64 => (i % 4u64) as f64,
text_field => "text"
));
}
@@ -104,10 +106,11 @@ mod tests {
let searcher = index.reader().searcher();
let mut ffvf_i64: IntFacetCollector<I64FastFieldReader> = IntFacetCollector::new(num_field_i64);
let mut ffvf_u64: IntFacetCollector<U64FastFieldReader> = IntFacetCollector::new(num_field_u64);
let mut ffvf_f64: IntFacetCollector<F64FastFieldReader> = IntFacetCollector::new(num_field_f64);
{
// perform the query
let mut facet_collectors = chain().push(&mut ffvf_i64).push(&mut ffvf_u64);
let mut facet_collectors = chain().push(&mut ffvf_i64).push(&mut ffvf_u64).push(&mut ffvf_f64);
let mut query_parser = QueryParser::for_index(index, vec![text_field]);
let query = query_parser.parse_query("text:text").unwrap();
query.search(&searcher, &mut facet_collectors).unwrap();
@@ -117,6 +120,8 @@ mod tests {
assert_eq!(ffvf_u64.counters[&1], 5);
assert_eq!(ffvf_i64.counters[&0], 4);
assert_eq!(ffvf_i64.counters[&1], 3);
assert_eq!(ffvf_f64.counters[&0.0], 3);
assert_eq!(ffvf_f64.counters[&2.0], 2);
}
}

View File

@@ -8,13 +8,23 @@ use crate::DocId;
use crate::Score;
use crate::SegmentLocalId;
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
compute_score: true,
};
pub const TEST_COLLECTOR_WITHOUT_SCORE: TestCollector = TestCollector {
compute_score: true,
};
/// Stores all of the doc ids.
/// This collector is only used for tests.
/// It is unusable in pr
///
/// actise, as it does not store
/// the segment ordinals
pub struct TestCollector;
pub struct TestCollector {
pub compute_score: bool,
}
pub struct TestSegmentCollector {
segment_id: SegmentLocalId,
@@ -32,7 +42,6 @@ impl TestFruit {
pub fn docs(&self) -> &[DocAddress] {
&self.docs[..]
}
pub fn scores(&self) -> &[Score] {
&self.scores[..]
}
@@ -54,7 +63,7 @@ impl Collector for TestCollector {
}
fn requires_scoring(&self) -> bool {
true
self.compute_score
}
fn merge_fruits(&self, mut children: Vec<TestFruit>) -> Result<TestFruit> {

View File

@@ -13,6 +13,7 @@ use crate::Result;
use crate::Score;
use crate::SegmentLocalId;
use crate::SegmentReader;
use std::fmt;
/// The Top Score Collector keeps track of the K documents
/// sorted by their score.
@@ -68,6 +69,12 @@ use crate::SegmentReader;
/// ```
pub struct TopDocs(TopCollector<Score>);
impl fmt::Debug for TopDocs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TopDocs({})", self.0.limit())
}
}
impl TopDocs {
/// Creates a top score collector, with a number of documents equal to "limit".
///
@@ -160,6 +167,7 @@ impl TopDocs {
.fast_fields()
.u64(field)
.expect("Field requested is not a i64/u64 fast field.");
//TODO error message missmatch actual behavior for i64
move |doc: DocId| ff_reader.get(doc)
})
}

View File

@@ -99,15 +99,54 @@ pub fn u64_to_i64(val: u64) -> i64 {
(val ^ HIGHEST_BIT) as i64
}
/// Maps a `f64` to `u64`
///
/// For simplicity, tantivy internally handles `f64` as `u64`.
/// The mapping is defined by this function.
///
/// Maps `f64` to `u64` so that lexical order is preserved.
///
/// This is more suited than simply casting (`val as u64`)
/// which would truncate the result
///
/// # See also
/// The [reverse mapping is `u64_to_f64`](./fn.u64_to_f64.html).
#[inline(always)]
pub fn f64_to_u64(val: f64) -> u64 {
let bits = val.to_bits();
if val.is_sign_positive() {
bits ^ HIGHEST_BIT
} else {
!bits
}
}
/// Reverse the mapping given by [`i64_to_u64`](./fn.i64_to_u64.html).
#[inline(always)]
pub fn u64_to_f64(val: u64) -> f64 {
f64::from_bits(
if val & HIGHEST_BIT != 0 {
val ^ HIGHEST_BIT
} else {
!val
}
)
}
#[cfg(test)]
pub(crate) mod test {
pub use super::serialize::test::fixed_size_test;
use super::{compute_num_bits, i64_to_u64, u64_to_i64};
use super::{compute_num_bits, i64_to_u64, u64_to_i64, f64_to_u64, u64_to_f64};
use std::f64;
fn test_i64_converter_helper(val: i64) {
assert_eq!(u64_to_i64(i64_to_u64(val)), val);
}
fn test_f64_converter_helper(val: f64) {
assert_eq!(u64_to_f64(f64_to_u64(val)), val);
}
#[test]
fn test_i64_converter() {
@@ -121,6 +160,28 @@ pub(crate) mod test {
}
}
#[test]
fn test_f64_converter() {
test_f64_converter_helper(f64::INFINITY);
test_f64_converter_helper(f64::NEG_INFINITY);
test_f64_converter_helper(0.0);
test_f64_converter_helper(-0.0);
test_f64_converter_helper(1.0);
test_f64_converter_helper(-1.0);
}
#[test]
fn test_f64_order() {
assert!(!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY)).contains(&f64_to_u64(f64::NAN))); //nan is not a number
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); //same exponent, different mantissa
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); //same mantissa, different exponent
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); //different exponent and mantissa
assert!(f64_to_u64(1.0) > f64_to_u64(-1.0)); // pos > neg
assert!(f64_to_u64(-1.5) < f64_to_u64(-1.0));
assert!(f64_to_u64(-2.0) < f64_to_u64(1.0));
assert!(f64_to_u64(-2.0) < f64_to_u64(-1.5));
}
#[test]
fn test_compute_num_bits() {
assert_eq!(compute_num_bits(1), 1u8);

View File

@@ -102,6 +102,19 @@ impl FixedSize for i64 {
const SIZE_IN_BYTES: usize = 8;
}
impl BinarySerializable for f64 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_f64::<Endianness>(*self)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
reader.read_f64::<Endianness>()
}
}
impl FixedSize for f64 {
const SIZE_IN_BYTES: usize = 8;
}
impl BinarySerializable for u8 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u8(*self)
@@ -172,6 +185,11 @@ pub mod test {
fixed_size_test::<i64>();
}
#[test]
fn test_serialize_f64() {
fixed_size_test::<f64>();
}
#[test]
fn test_serialize_u64() {
fixed_size_test::<u64>();

View File

@@ -4,6 +4,7 @@ use crate::core::Executor;
use crate::core::IndexMeta;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentMetaInventory;
use crate::core::META_FILEPATH;
use crate::directory::ManagedDirectory;
#[cfg(feature = "mmap")]
@@ -12,7 +13,6 @@ use crate::directory::INDEX_WRITER_LOCK;
use crate::directory::{Directory, RAMDirectory};
use crate::error::DataCorruption;
use crate::error::TantivyError;
use crate::indexer::index_writer::open_index_writer;
use crate::indexer::index_writer::HEAP_SIZE_MIN;
use crate::indexer::segment_updater::save_new_metas;
use crate::reader::IndexReader;
@@ -25,17 +25,16 @@ use crate::tokenizer::TokenizerManager;
use crate::IndexWriter;
use crate::Result;
use num_cpus;
use serde_json;
use std::borrow::BorrowMut;
use std::fmt;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::sync::Arc;
fn load_metas(directory: &dyn Directory) -> Result<IndexMeta> {
fn load_metas(directory: &dyn Directory, inventory: &SegmentMetaInventory) -> Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
let meta_string = String::from_utf8_lossy(&meta_data);
serde_json::from_str(&meta_string)
IndexMeta::deserialize(&meta_string, &inventory)
.map_err(|e| {
DataCorruption::new(
META_FILEPATH.to_path_buf(),
@@ -52,6 +51,7 @@ pub struct Index {
schema: Schema,
executor: Arc<Executor>,
tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
}
impl Index {
@@ -148,19 +148,23 @@ impl Index {
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
save_new_metas(schema.clone(), directory.borrow_mut())?;
let metas = IndexMeta::with_schema(schema);
Index::create_from_metas(directory, &metas)
Index::create_from_metas(directory, &metas, SegmentMetaInventory::default())
}
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: ManagedDirectory, metas: &IndexMeta) -> Result<Index> {
fn create_from_metas(
directory: ManagedDirectory,
metas: &IndexMeta,
inventory: SegmentMetaInventory,
) -> Result<Index> {
let schema = metas.schema.clone();
let index = Index {
Ok(Index {
directory,
schema,
tokenizers: TokenizerManager::default(),
executor: Arc::new(Executor::single_thread()),
};
Ok(index)
inventory,
})
}
/// Accessor for the tokenizer manager.
@@ -212,16 +216,21 @@ impl Index {
Index::open(mmap_directory)
}
pub(crate) fn inventory(&self) -> &SegmentMetaInventory {
&self.inventory
}
/// Open the index using the provided directory
pub fn open<D: Directory>(directory: D) -> Result<Index> {
let directory = ManagedDirectory::wrap(directory)?;
let metas = load_metas(&directory)?;
Index::create_from_metas(directory, &metas)
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
Index::create_from_metas(directory, &metas, inventory)
}
/// Reads the index meta file from the directory.
pub fn load_metas(&self) -> Result<IndexMeta> {
load_metas(self.directory())
load_metas(self.directory(), &self.inventory)
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -265,7 +274,7 @@ impl Index {
)
})?;
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
open_index_writer(
IndexWriter::new(
self,
num_threads,
heap_size_in_bytes_per_thread,
@@ -315,7 +324,9 @@ impl Index {
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
let segment_meta = SegmentMeta::new(SegmentId::generate_random(), 0);
let segment_meta = self
.inventory
.new_segment_meta(SegmentId::generate_random(), 0);
self.segment(segment_meta)
}

View File

@@ -1,8 +1,185 @@
use crate::core::SegmentMeta;
use super::SegmentComponent;
use crate::core::SegmentId;
use crate::schema::Schema;
use crate::Opstamp;
use census::{Inventory, TrackedObject};
use serde;
use serde_json;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
opstamp: Opstamp,
}
#[derive(Clone, Default)]
pub struct SegmentMetaInventory {
inventory: Inventory<InnerSegmentMeta>,
}
impl SegmentMetaInventory {
/// Lists all living `SegmentMeta` object at the time of the call.
pub fn all(&self) -> Vec<SegmentMeta> {
self.inventory
.list()
.into_iter()
.map(SegmentMeta::from)
.collect::<Vec<_>>()
}
#[doc(hidden)]
pub fn new_segment_meta(&self, segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
let inner = InnerSegmentMeta {
segment_id,
max_doc,
deletes: None,
};
SegmentMeta::from(self.inventory.track(inner))
}
}
/// `SegmentMeta` contains simple meta information about a segment.
///
/// For instance the number of docs it contains,
/// how many are deleted, etc.
#[derive(Clone)]
pub struct SegmentMeta {
tracked: TrackedObject<InnerSegmentMeta>,
}
impl fmt::Debug for SegmentMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
self.tracked.fmt(f)
}
}
impl serde::Serialize for SegmentMeta {
fn serialize<S>(
&self,
serializer: S,
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
self.tracked.serialize(serializer)
}
}
impl From<TrackedObject<InnerSegmentMeta>> for SegmentMeta {
fn from(tracked: TrackedObject<InnerSegmentMeta>) -> SegmentMeta {
SegmentMeta { tracked }
}
}
impl SegmentMeta {
// Creates a new `SegmentMeta` object.
/// Returns the segment id.
pub fn id(&self) -> SegmentId {
self.tracked.segment_id
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.num_deleted_docs)
.unwrap_or(0u32)
}
/// Returns the list of files that
/// are required for the segment meta.
///
/// This is useful as the way tantivy removes files
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
/// Returns the relative path of a component of our segment.
///
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
let mut path = self.id().uuid_string();
path.push_str(&*match component {
SegmentComponent::POSTINGS => ".idx".to_string(),
SegmentComponent::POSITIONS => ".pos".to_string(),
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
SegmentComponent::TERMS => ".term".to_string(),
SegmentComponent::STORE => ".store".to_string(),
SegmentComponent::FASTFIELDS => ".fast".to_string(),
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
});
PathBuf::from(path)
}
/// Return the highest doc id + 1
///
/// If there are no deletes, then num_docs = max_docs
/// and all the doc ids contains in this segment
/// are exactly (0..max_doc).
pub fn max_doc(&self) -> u32 {
self.tracked.max_doc
}
/// Return the number of documents in the segment.
pub fn num_docs(&self) -> u32 {
self.max_doc() - self.num_deleted_docs()
}
/// Returns the `Opstamp` of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<Opstamp> {
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.opstamp)
}
/// Returns true iff the segment meta contains
/// delete information.
pub fn has_deletes(&self) -> bool {
self.num_deleted_docs() > 0
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
};
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
}
impl InnerSegmentMeta {
pub fn track(self, inventory: &SegmentMetaInventory) -> SegmentMeta {
SegmentMeta {
tracked: inventory.inventory.track(self),
}
}
}
/// Meta information about the `Index`.
///
@@ -12,7 +189,7 @@ use std::fmt;
/// * the index `docstamp`
/// * the schema
///
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize)]
pub struct IndexMeta {
/// List of `SegmentMeta` informations associated to each finalized segment of the index.
pub segments: Vec<SegmentMeta>,
@@ -29,6 +206,30 @@ pub struct IndexMeta {
pub payload: Option<String>,
}
#[derive(Deserialize)]
struct UntrackedIndexMeta {
pub segments: Vec<InnerSegmentMeta>,
pub schema: Schema,
pub opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}
impl UntrackedIndexMeta {
pub fn track(self, inventory: &SegmentMetaInventory) -> IndexMeta {
IndexMeta {
segments: self
.segments
.into_iter()
.map(|inner_seg_meta| inner_seg_meta.track(inventory))
.collect::<Vec<SegmentMeta>>(),
schema: self.schema,
opstamp: self.opstamp,
payload: self.payload,
}
}
}
impl IndexMeta {
/// Create an `IndexMeta` object representing a brand new `Index`
/// with the given index.
@@ -43,6 +244,14 @@ impl IndexMeta {
payload: None,
}
}
pub(crate) fn deserialize(
meta_json: &str,
inventory: &SegmentMetaInventory,
) -> serde_json::Result<IndexMeta> {
let untracked_meta_json: UntrackedIndexMeta = serde_json::from_str(meta_json)?;
Ok(untracked_meta_json.track(inventory))
}
}
impl fmt::Debug for IndexMeta {

View File

@@ -32,7 +32,7 @@ pub struct InvertedIndexReader {
}
impl InvertedIndexReader {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))] // for symetry
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))] // for symmetry
pub(crate) fn new(
termdict: TermDictionary,
postings_source: ReadOnlySource,

View File

@@ -6,19 +6,17 @@ pub mod searcher;
mod segment;
mod segment_component;
mod segment_id;
mod segment_meta;
mod segment_reader;
pub use self::executor::Executor;
pub use self::index::Index;
pub use self::index_meta::IndexMeta;
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::Searcher;
pub use self::segment::Segment;
pub use self::segment::SerializableSegment;
pub use self::segment_component::SegmentComponent;
pub use self::segment_id::SegmentId;
pub use self::segment_meta::SegmentMeta;
pub use self::segment_reader::SegmentReader;
use once_cell::sync::Lazy;

View File

@@ -1,174 +0,0 @@
use super::SegmentComponent;
use crate::core::SegmentId;
use crate::Opstamp;
use census::{Inventory, TrackedObject};
use once_cell::sync::Lazy;
use serde;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
static INVENTORY: Lazy<Inventory<InnerSegmentMeta>> = Lazy::new(Inventory::new);
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
opstamp: Opstamp,
}
/// `SegmentMeta` contains simple meta information about a segment.
///
/// For instance the number of docs it contains,
/// how many are deleted, etc.
#[derive(Clone)]
pub struct SegmentMeta {
tracked: TrackedObject<InnerSegmentMeta>,
}
impl fmt::Debug for SegmentMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
self.tracked.fmt(f)
}
}
impl serde::Serialize for SegmentMeta {
fn serialize<S>(
&self,
serializer: S,
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
where
S: serde::Serializer,
{
self.tracked.serialize(serializer)
}
}
impl<'a> serde::Deserialize<'a> for SegmentMeta {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as serde::Deserializer<'a>>::Error>
where
D: serde::Deserializer<'a>,
{
let inner = InnerSegmentMeta::deserialize(deserializer)?;
let tracked = INVENTORY.track(inner);
Ok(SegmentMeta { tracked })
}
}
impl SegmentMeta {
/// Lists all living `SegmentMeta` object at the time of the call.
pub fn all() -> Vec<SegmentMeta> {
INVENTORY
.list()
.into_iter()
.map(|inner| SegmentMeta { tracked: inner })
.collect::<Vec<_>>()
}
/// Creates a new `SegmentMeta` object.
#[doc(hidden)]
pub fn new(segment_id: SegmentId, max_doc: u32) -> SegmentMeta {
let inner = InnerSegmentMeta {
segment_id,
max_doc,
deletes: None,
};
SegmentMeta {
tracked: INVENTORY.track(inner),
}
}
/// Returns the segment id.
pub fn id(&self) -> SegmentId {
self.tracked.segment_id
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.num_deleted_docs)
.unwrap_or(0u32)
}
/// Returns the list of files that
/// are required for the segment meta.
///
/// This is useful as the way tantivy removes files
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
/// Returns the relative path of a component of our segment.
///
/// It just joins the segment id with the extension
/// associated to a segment component.
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
let mut path = self.id().uuid_string();
path.push_str(&*match component {
SegmentComponent::POSTINGS => ".idx".to_string(),
SegmentComponent::POSITIONS => ".pos".to_string(),
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
SegmentComponent::TERMS => ".term".to_string(),
SegmentComponent::STORE => ".store".to_string(),
SegmentComponent::FASTFIELDS => ".fast".to_string(),
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
});
PathBuf::from(path)
}
/// Return the highest doc id + 1
///
/// If there are no deletes, then num_docs = max_docs
/// and all the doc ids contains in this segment
/// are exactly (0..max_doc).
pub fn max_doc(&self) -> u32 {
self.tracked.max_doc
}
/// Return the number of documents in the segment.
pub fn num_docs(&self) -> u32 {
self.max_doc() - self.num_deleted_docs()
}
/// Returns the `Opstamp` of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<Opstamp> {
self.tracked
.deletes
.as_ref()
.map(|delete_meta| delete_meta.opstamp)
}
/// Returns true iff the segment meta contains
/// delete information.
pub fn has_deletes(&self) -> bool {
self.num_deleted_docs() > 0
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
};
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<DeleteMeta>,
}

View File

@@ -204,7 +204,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Internally, tantivy only uses this API to detect new commits to implement the
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
/// `OnCommit` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
}
/// DirectoryClone

View File

@@ -135,28 +135,28 @@ impl ManagedDirectory {
files_to_delete.push(managed_path.clone());
}
}
} else {
error!("Failed to acquire lock for GC");
}
}
let mut deleted_files = vec![];
{
for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) {
Ok(_) => {
info!("Deleted {:?}", file_to_delete);
deleted_files.push(file_to_delete);
}
Err(file_error) => {
match file_error {
DeleteError::FileDoesNotExist(_) => {
deleted_files.push(file_to_delete);
}
DeleteError::IOError(_) => {
if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file
// is mmapped.
error!("Failed to delete {:?}", file_to_delete);
}
for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) {
Ok(_) => {
info!("Deleted {:?}", file_to_delete);
deleted_files.push(file_to_delete);
}
Err(file_error) => {
match file_error {
DeleteError::FileDoesNotExist(_) => {
deleted_files.push(file_to_delete);
}
DeleteError::IOError(_) => {
if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file
// is mmapped.
error!("Failed to delete {:?}", file_to_delete);
}
}
}
@@ -171,11 +171,9 @@ impl ManagedDirectory {
.meta_informations
.write()
.expect("Managed directory wlock poisoned (2).");
{
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
if save_managed_paths(self.directory.as_mut(), &meta_informations_wlock).is_err() {
error!("Failed to save the list of managed files.");
@@ -243,7 +241,7 @@ impl Directory for ManagedDirectory {
self.directory.acquire_lock(lock)
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.directory.watch(watch_callback)
}
}
@@ -257,100 +255,80 @@ impl Clone for ManagedDirectory {
}
}
#[cfg(feature = "mmap")]
#[cfg(test)]
mod tests {
mod tests_mmap_specific {
#[cfg(feature = "mmap")]
mod mmap_specific {
use crate::directory::{Directory, ManagedDirectory, MmapDirectory};
use std::collections::HashSet;
use std::io::Write;
use std::path::{Path, PathBuf};
use tempdir::TempDir;
use super::super::*;
use once_cell::sync::Lazy;
use std::path::Path;
use tempdir::TempDir;
static TEST_PATH1: Lazy<&'static Path> = Lazy::new(|| Path::new("some_path_for_test"));
static TEST_PATH2: Lazy<&'static Path> = Lazy::new(|| Path::new("some_path_for_test2"));
use crate::directory::MmapDirectory;
use std::io::Write;
#[test]
fn test_managed_directory() {
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
{
let mut write_file = managed_directory.open_write(*TEST_PATH1).unwrap();
write_file.flush().unwrap();
}
{
managed_directory
.atomic_write(*TEST_PATH2, &vec![0u8, 1u8])
.unwrap();
}
{
assert!(managed_directory.exists(*TEST_PATH1));
assert!(managed_directory.exists(*TEST_PATH2));
}
{
let living_files: HashSet<PathBuf> =
[TEST_PATH1.to_owned()].into_iter().cloned().collect();
managed_directory.garbage_collect(|| living_files);
}
{
assert!(managed_directory.exists(*TEST_PATH1));
assert!(!managed_directory.exists(*TEST_PATH2));
}
}
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
{
assert!(managed_directory.exists(*TEST_PATH1));
assert!(!managed_directory.exists(*TEST_PATH2));
}
{
let living_files: HashSet<PathBuf> = HashSet::new();
managed_directory.garbage_collect(|| living_files);
}
{
assert!(!managed_directory.exists(*TEST_PATH1));
assert!(!managed_directory.exists(*TEST_PATH2));
}
}
}
#[test]
fn test_managed_directory_gc_while_mmapped() {
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let living_files = HashSet::new();
#[test]
fn test_managed_directory() {
let tempdir = TempDir::new("tantivy-test").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let test_path1: &'static Path = Path::new("some_path_for_test");
let test_path2: &'static Path = Path::new("some_path_for_test_2");
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
let mut write_file = managed_directory.open_write(test_path1).unwrap();
write_file.flush().unwrap();
managed_directory
.atomic_write(*TEST_PATH1, &vec![0u8, 1u8])
.atomic_write(test_path2, &[0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(*TEST_PATH1));
let _mmap_read = managed_directory.open_read(*TEST_PATH1).unwrap();
managed_directory.garbage_collect(|| living_files.clone());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(*TEST_PATH1));
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
managed_directory.garbage_collect(|| living_files);
assert!(!managed_directory.exists(*TEST_PATH1));
} else {
assert!(!managed_directory.exists(*TEST_PATH1));
}
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> =
[test_path1.to_owned()].into_iter().cloned().collect();
managed_directory.garbage_collect(|| living_files);
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = HashSet::new();
managed_directory.garbage_collect(|| living_files);
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
}
#[test]
fn test_managed_directory_gc_while_mmapped() {
let test_path1: &'static Path = Path::new("some_path_for_test");
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let living_files = HashSet::new();
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
managed_directory
.atomic_write(test_path1, &vec![0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(test_path1));
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
managed_directory.garbage_collect(|| living_files.clone());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1));
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
managed_directory.garbage_collect(|| living_files);
assert!(!managed_directory.exists(test_path1));
} else {
assert!(!managed_directory.exists(test_path1));
}
}
}

View File

@@ -161,7 +161,7 @@ impl InnerWatcherWrapper {
}
#[derive(Clone)]
pub(crate) struct WatcherWrapper {
struct WatcherWrapper {
inner: Arc<InnerWatcherWrapper>,
}
@@ -231,7 +231,7 @@ struct MmapDirectoryInner {
root_path: PathBuf,
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: RwLock<WatcherWrapper>,
watcher: RwLock<Option<WatcherWrapper>>,
}
impl MmapDirectoryInner {
@@ -239,19 +239,36 @@ impl MmapDirectoryInner {
root_path: PathBuf,
temp_directory: Option<TempDir>,
) -> Result<MmapDirectoryInner, OpenDirectoryError> {
let watch_wrapper = WatcherWrapper::new(&root_path)?;
let mmap_directory_inner = MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: RwLock::new(watch_wrapper),
watcher: RwLock::new(None),
};
Ok(mmap_directory_inner)
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
let mut wlock = self.watcher.write().unwrap();
wlock.watch(watch_callback)
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
return Ok(watch_wrapper.watch(watch_callback));
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
}
}
@@ -417,7 +434,6 @@ impl Directory for MmapDirectory {
/// Any entry associated to the path in the mmap will be
/// removed before the file is deleted.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
match fs::remove_file(&full_path) {
Ok(_) => self
@@ -515,7 +531,7 @@ impl Directory for MmapDirectory {
})))
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.inner.watch(watch_callback)
}
}

View File

@@ -29,7 +29,7 @@ use std::io::{BufWriter, Write};
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
pub(crate) use self::managed_directory::ManagedDirectory;
pub use self::managed_directory::ManagedDirectory;
/// Write object for Directory.
///

View File

@@ -145,6 +145,11 @@ impl Directory for RAMDirectory {
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
fail_point!("RAMDirectory::delete", |_| {
use crate::directory::error::IOError;
let io_error = IOError::from(io::Error::from(io::ErrorKind::Other));
Err(DeleteError::from(io_error))
});
self.fs.write().unwrap().delete(path)
}
@@ -188,7 +193,7 @@ impl Directory for RAMDirectory {
Ok(())
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
self.fs.write().unwrap().watch(watch_callback)
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.fs.write().unwrap().watch(watch_callback))
}
}

View File

@@ -1,5 +1,4 @@
use super::*;
use once_cell::sync::Lazy;
use std::io::Write;
use std::mem;
use std::path::{Path, PathBuf};
@@ -10,8 +9,6 @@ use std::thread;
use std::time;
use std::time::Duration;
static TEST_PATH: Lazy<&'static Path> = Lazy::new(|| Path::new("some_path_for_test"));
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
@@ -28,76 +25,78 @@ fn test_mmap_directory() {
#[test]
#[should_panic]
fn ram_directory_panics_if_flush_forgotten() {
let test_path: &'static Path = Path::new("some_path_for_test");
let mut ram_directory = RAMDirectory::create();
let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap();
let mut write_file = ram_directory.open_write(test_path).unwrap();
assert!(write_file.write_all(&[4]).is_ok());
}
fn test_simple(directory: &mut dyn Directory) {
let test_path: &'static Path = Path::new("some_path_for_test");
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
let mut write_file = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path));
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7, 3, 5]).unwrap();
write_file.flush().unwrap();
}
{
let read_file = directory.open_read(*TEST_PATH).unwrap();
let read_file = directory.open_read(test_path).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
}
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(!directory.exists(*TEST_PATH));
assert!(directory.delete(test_path).is_ok());
assert!(!directory.exists(test_path));
}
fn test_rewrite_forbidden(directory: &mut dyn Directory) {
let test_path: &'static Path = Path::new("some_path_for_test");
{
directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path));
}
{
assert!(directory.open_write(*TEST_PATH).is_err());
assert!(directory.open_write(test_path).is_err());
}
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(directory.delete(test_path).is_ok());
}
fn test_write_create_the_file(directory: &mut dyn Directory) {
let test_path: &'static Path = Path::new("some_path_for_test");
{
assert!(directory.open_read(*TEST_PATH).is_err());
let _w = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
assert!(directory.open_read(*TEST_PATH).is_ok());
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(directory.open_read(test_path).is_err());
let _w = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path));
assert!(directory.open_read(test_path).is_ok());
assert!(directory.delete(test_path).is_ok());
}
}
fn test_directory_delete(directory: &mut dyn Directory) {
assert!(directory.open_read(*TEST_PATH).is_err());
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
let test_path: &'static Path = Path::new("some_path_for_test");
assert!(directory.open_read(test_path).is_err());
let mut write_file = directory.open_write(&test_path).unwrap();
write_file.write_all(&[1, 2, 3, 4]).unwrap();
write_file.flush().unwrap();
{
let read_handle = directory.open_read(*TEST_PATH).unwrap();
{
let read_handle = directory.open_read(&test_path).unwrap();
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
// Mapped files can't be deleted on Windows
if !cfg!(windows) {
assert!(directory.delete(&test_path).is_ok());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
// Mapped files can't be deleted on Windows
if !cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
}
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
}
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
}
if cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(directory.delete(&test_path).is_ok());
}
assert!(directory.open_read(*TEST_PATH).is_err());
assert!(directory.delete(*TEST_PATH).is_err());
assert!(directory.open_read(&test_path).is_err());
assert!(directory.delete(&test_path).is_err());
}
fn test_directory(directory: &mut dyn Directory) {
@@ -122,7 +121,7 @@ fn test_watch(directory: &mut dyn Directory) {
thread::sleep(Duration::new(0, 10_000));
assert_eq!(0, counter.load(Ordering::SeqCst));
let watch_handle = directory.watch(watch_callback);
let watch_handle = directory.watch(watch_callback).unwrap();
for i in 0..10 {
assert_eq!(i, counter.load(Ordering::SeqCst));
assert!(directory

View File

@@ -48,7 +48,7 @@ mod readers;
mod serializer;
mod writer;
/// Trait for types that are allowed for fast fields: (u64 or i64).
/// Trait for types that are allowed for fast fields: (u64, i64 and f64).
pub trait FastValue: Default + Clone + Copy + Send + Sync + PartialOrd {
/// Converts a value from u64
///
@@ -114,11 +114,33 @@ impl FastValue for i64 {
}
}
impl FastValue for f64 {
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
fn to_u64(&self) -> u64 {
common::f64_to_u64(*self)
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::F64(ref integer_options) => integer_options.get_fastfield_cardinality(),
_ => None,
}
}
fn as_u64(&self) -> u64 {
self.to_bits()
}
}
fn value_to_u64(value: &Value) -> u64 {
match *value {
Value::U64(ref val) => *val,
Value::I64(ref val) => common::i64_to_u64(*val),
_ => panic!("Expected a u64/i64 field, got {:?} ", value),
Value::F64(ref val) => common::f64_to_u64(*val),
_ => panic!("Expected a u64/i64/f64 field, got {:?} ", value),
}
}

View File

@@ -14,8 +14,10 @@ use std::collections::HashMap;
pub struct FastFieldReaders {
fast_field_i64: HashMap<Field, FastFieldReader<i64>>,
fast_field_u64: HashMap<Field, FastFieldReader<u64>>,
fast_field_f64: HashMap<Field, FastFieldReader<f64>>,
fast_field_i64s: HashMap<Field, MultiValueIntFastFieldReader<i64>>,
fast_field_u64s: HashMap<Field, MultiValueIntFastFieldReader<u64>>,
fast_field_f64s: HashMap<Field, MultiValueIntFastFieldReader<f64>>,
fast_bytes: HashMap<Field, BytesFastFieldReader>,
fast_fields_composite: CompositeFile,
}
@@ -23,6 +25,7 @@ pub struct FastFieldReaders {
enum FastType {
I64,
U64,
F64,
}
fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality)> {
@@ -33,6 +36,9 @@ fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality
FieldType::I64(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::I64, cardinality)),
FieldType::F64(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::F64, cardinality)),
FieldType::HierarchicalFacet => Some((FastType::U64, Cardinality::MultiValues)),
_ => None,
}
@@ -46,8 +52,10 @@ impl FastFieldReaders {
let mut fast_field_readers = FastFieldReaders {
fast_field_i64: Default::default(),
fast_field_u64: Default::default(),
fast_field_f64: Default::default(),
fast_field_i64s: Default::default(),
fast_field_u64s: Default::default(),
fast_field_f64s: Default::default(),
fast_bytes: Default::default(),
fast_fields_composite: fast_fields_composite.clone(),
};
@@ -82,6 +90,12 @@ impl FastFieldReaders {
FastFieldReader::open(fast_field_data.clone()),
);
}
FastType::F64 => {
fast_field_readers.fast_field_f64.insert(
field,
FastFieldReader::open(fast_field_data.clone()),
);
}
}
} else {
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
@@ -109,6 +123,14 @@ impl FastFieldReaders {
.fast_field_u64s
.insert(field, multivalued_int_fast_field);
}
FastType::F64 => {
let vals_reader = FastFieldReader::open(fast_field_data);
let multivalued_int_fast_field =
MultiValueIntFastFieldReader::open(idx_reader, vals_reader);
fast_field_readers
.fast_field_f64s
.insert(field, multivalued_int_fast_field);
}
}
} else {
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
@@ -135,6 +157,8 @@ impl FastFieldReaders {
/// If the field is a i64-fast field, return the associated u64 reader. Values are
/// mapped from i64 to u64 using a (well the, it is unique) monotonic mapping. ///
///
///TODO should it also be lenient with f64?
///
/// This method is useful when merging segment reader.
pub(crate) fn u64_lenient(&self, field: Field) -> Option<FastFieldReader<u64>> {
if let Some(u64_ff_reader) = self.u64(field) {
@@ -153,6 +177,13 @@ impl FastFieldReaders {
self.fast_field_i64.get(&field).cloned()
}
/// Returns the `f64` fast field reader reader associated to `field`.
///
/// If `field` is not a f64 fast field, this method returns `None`.
pub fn f64(&self, field: Field) -> Option<FastFieldReader<f64>> {
self.fast_field_f64.get(&field).cloned()
}
/// Returns a `u64s` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a u64 multi-valued fast field, this method returns `None`.
@@ -182,6 +213,13 @@ impl FastFieldReaders {
self.fast_field_i64s.get(&field).cloned()
}
/// Returns a `f64s` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a f64 multi-valued fast field, this method returns `None`.
pub fn f64s(&self, field: Field) -> Option<MultiValueIntFastFieldReader<f64>> {
self.fast_field_f64s.get(&field).cloned()
}
/// Returns the `bytes` fast field reader associated to `field`.
///
/// If `field` is not a bytes fast field, returns `None`.

View File

@@ -25,13 +25,13 @@ impl FastFieldsWriter {
for (field_id, field_entry) in schema.fields().iter().enumerate() {
let field = Field(field_id as u32);
let default_value = if let FieldType::I64(_) = *field_entry.field_type() {
common::i64_to_u64(0i64)
} else {
0u64
let default_value = match *field_entry.field_type() {
FieldType::I64(_) => common::i64_to_u64(0i64),
FieldType::F64(_) => common::f64_to_u64(0.0f64),
_ => 0u64,
};
match *field_entry.field_type() {
FieldType::I64(ref int_options) | FieldType::U64(ref int_options) => {
FieldType::I64(ref int_options) | FieldType::U64(ref int_options) | FieldType::F64(ref int_options) => {
match int_options.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
let mut fast_field_writer = IntFastFieldWriter::new(field);
@@ -142,9 +142,9 @@ impl FastFieldsWriter {
/// bitpacked and the number of bits required for bitpacking
/// can only been known once we have seen all of the values.
///
/// Both u64, and i64 use the same writer.
/// i64 are just remapped to the `0..2^64 - 1`
/// using `common::i64_to_u64`.
/// Both u64, i64 and f64 use the same writer.
/// i64 and f64 are just remapped to the `0..2^64 - 1`
/// using `common::i64_to_u64` and `common::f64_to_u64`.
pub struct IntFastFieldWriter {
field: Field,
vals: Vec<u8>,
@@ -203,8 +203,8 @@ impl IntFastFieldWriter {
/// Extract the value associated to the fast field for
/// this document.
///
/// i64 are remapped to u64 using the logic
/// in `common::i64_to_u64`.
/// i64 and f64 are remapped to u64 using the logic
/// in `common::i64_to_u64` and `common::f64_to_u64`.
///
/// If the value is missing, then the default value is used
/// instead.

View File

@@ -10,28 +10,263 @@ pub fn fieldnorm_to_id(fieldnorm: u32) -> u8 {
.unwrap_or_else(|idx| idx - 1) as u8
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::unreadable_literal))]
pub const FIELD_NORMS_TABLE: [u32; 256] = [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25,
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 42, 44, 46, 48, 50, 52, 54, 56, 60,
64, 68, 72, 76, 80, 84, 88, 96, 104, 112, 120, 128, 136, 144, 152, 168, 184, 200, 216, 232,
248, 264, 280, 312, 344, 376, 408, 440, 472, 504, 536, 600, 664, 728, 792, 856, 920, 984,
1_048, 1176, 1304, 1432, 1560, 1688, 1816, 1944, 2072, 2328, 2584, 2840, 3096, 3352, 3608,
3864, 4120, 4632, 5144, 5656, 6168, 6680, 7192, 7704, 8216, 9240, 10264, 11288, 12312, 13336,
14360, 15384, 16408, 18456, 20504, 22552, 24600, 26648, 28696, 30744, 32792, 36888, 40984,
45080, 49176, 53272, 57368, 61464, 65560, 73752, 81944, 90136, 98328, 106520, 114712, 122904,
131096, 147480, 163864, 180248, 196632, 213016, 229400, 245784, 262168, 294936, 327704, 360472,
393240, 426008, 458776, 491544, 524312, 589848, 655384, 720920, 786456, 851992, 917528, 983064,
1048600, 1179672, 1310744, 1441816, 1572888, 1703960, 1835032, 1966104, 2097176, 2359320,
2621464, 2883608, 3145752, 3407896, 3670040, 3932184, 4194328, 4718616, 5242904, 5767192,
6291480, 6815768, 7340056, 7864344, 8388632, 9437208, 10485784, 11534360, 12582936, 13631512,
14680088, 15728664, 16777240, 18874392, 20971544, 23068696, 25165848, 27263000, 29360152,
31457304, 33554456, 37748760, 41943064, 46137368, 50331672, 54525976, 58720280, 62914584,
67108888, 75497496, 83886104, 92274712, 100663320, 109051928, 117440536, 125829144, 134217752,
150994968, 167772184, 184549400, 201326616, 218103832, 234881048, 251658264, 268435480,
301989912, 335544344, 369098776, 402653208, 436207640, 469762072, 503316504, 536870936,
603979800, 671088664, 738197528, 805306392, 872415256, 939524120, 1006632984, 1073741848,
1207959576, 1342177304, 1476395032, 1610612760, 1744830488, 1879048216, 2013265944,
0,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31,
32,
33,
34,
35,
36,
37,
38,
39,
40,
42,
44,
46,
48,
50,
52,
54,
56,
60,
64,
68,
72,
76,
80,
84,
88,
96,
104,
112,
120,
128,
136,
144,
152,
168,
184,
200,
216,
232,
248,
264,
280,
312,
344,
376,
408,
440,
472,
504,
536,
600,
664,
728,
792,
856,
920,
984,
1_048,
1_176,
1_304,
1_432,
1_560,
1_688,
1_816,
1_944,
2_072,
2_328,
2_584,
2_840,
3_096,
3_352,
3_608,
3_864,
4_120,
4_632,
5_144,
5_656,
6_168,
6_680,
7_192,
7_704,
8_216,
9_240,
10_264,
11_288,
12_312,
13_336,
14_360,
15_384,
16_408,
18_456,
20_504,
22_552,
24_600,
26_648,
28_696,
30_744,
32_792,
36_888,
40_984,
45_080,
49_176,
53_272,
57_368,
61_464,
65_560,
73_752,
81_944,
90_136,
98_328,
106_520,
114_712,
122_904,
131_096,
147_480,
163_864,
180_248,
196_632,
213_016,
229_400,
245_784,
262_168,
294_936,
327_704,
360_472,
393_240,
426_008,
458_776,
491_544,
524_312,
589_848,
655_384,
720_920,
786_456,
851_992,
917_528,
983_064,
1_048_600,
1_179_672,
1_310_744,
1_441_816,
1_572_888,
1_703_960,
1_835_032,
1_966_104,
2_097_176,
2_359_320,
2_621_464,
2_883_608,
3_145_752,
3_407_896,
3_670_040,
3_932_184,
4_194_328,
4_718_616,
5_242_904,
5_767_192,
6_291_480,
6_815_768,
7_340_056,
7_864_344,
8_388_632,
9_437_208,
10_485_784,
11_534_360,
12_582_936,
13_631_512,
14_680_088,
15_728_664,
16_777_240,
18_874_392,
20_971_544,
23_068_696,
25_165_848,
27_263_000,
29_360_152,
31_457_304,
33_554_456,
37_748_760,
41_943_064,
46_137_368,
50_331_672,
54_525_976,
58_720_280,
62_914_584,
67_108_888,
75_497_496,
83_886_104,
92_274_712,
100_663_320,
109_051_928,
117_440_536,
125_829_144,
134_217_752,
150_994_968,
167_772_184,
184_549_400,
201_326_616,
218_103_832,
234_881_048,
251_658_264,
268_435_480,
301_989_912,
335_544_344,
369_098_776,
402_653_208,
436_207_640,
469_762_072,
503_316_504,
536_870_936,
603_979_800,
671_088_664,
738_197_528,
805_306_392,
872_415_256,
939_524_120,
1_006_632_984,
1_073_741_848,
1_207_959_576,
1_342_177_304,
1_476_395_032,
1_610_612_760,
1_744_830_488,
1_879_048_216,
2_013_265_944,
];
#[cfg(test)]

View File

@@ -24,7 +24,7 @@ struct InnerDeleteQueue {
last_block: Option<Arc<Block>>,
}
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct DeleteQueue {
inner: Arc<RwLock<InnerDeleteQueue>>,
}
@@ -37,6 +37,7 @@ impl DeleteQueue {
};
let next_block = NextBlock::from(delete_queue.clone());
{
let mut delete_queue_wlock = delete_queue.inner.write().unwrap();
delete_queue_wlock.last_block = Some(Arc::new(Block {

View File

@@ -1,6 +1,5 @@
use crate::DocId;
use crate::Opstamp;
use std::sync::Arc;
// Doc to opstamp is used to identify which
// document should be deleted.
@@ -18,18 +17,18 @@ use std::sync::Arc;
// This mapping is (for the moment) stricly increasing
// because of the way document id are allocated.
#[derive(Clone)]
pub enum DocToOpstampMapping {
WithMap(Arc<Vec<u64>>),
pub enum DocToOpstampMapping<'a> {
WithMap(&'a [Opstamp]),
None,
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
impl<'a> From<&'a [u64]> for DocToOpstampMapping<'a> {
fn from(opstamps: &[Opstamp]) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(opstamps)
}
}
impl DocToOpstampMapping {
impl<'a> DocToOpstampMapping<'a> {
/// Given an opstamp return the limit doc id L
/// such that all doc id D such that
// D >= L iff opstamp(D) >= than `target_opstamp`.
@@ -65,17 +64,18 @@ mod tests {
#[test]
fn test_doc_to_opstamp_mapping_complex() {
{
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec![]);
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0);
}
{
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec![1u64]);
let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[1u64][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1);
}
{
let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec![1u64, 12u64, 17u64, 23u64]);
let doc_to_opstamp_mapping =
DocToOpstampMapping::from(&[1u64, 12u64, 17u64, 23u64][..]);
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0);
for i in 2u64..13u64 {
assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1);

View File

@@ -18,7 +18,6 @@ use crate::indexer::stamper::Stamper;
use crate::indexer::MergePolicy;
use crate::indexer::SegmentEntry;
use crate::indexer::SegmentWriter;
use crate::postings::compute_table_size;
use crate::schema::Document;
use crate::schema::IndexRecordOption;
use crate::schema::Term;
@@ -27,6 +26,8 @@ use crate::Result;
use bit_set::BitSet;
use crossbeam::channel;
use futures::{Canceled, Future};
use smallvec::smallvec;
use smallvec::SmallVec;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
@@ -45,29 +46,15 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
type OperationSender = channel::Sender<Vec<AddOperation>>;
type OperationReceiver = channel::Receiver<Vec<AddOperation>>;
/// Split the thread memory budget into
/// - the heap size
/// - the hash table "table" itself.
///
/// Returns (the heap size in bytes, the hash table size in number of bits)
fn initial_table_size(per_thread_memory_budget: usize) -> usize {
assert!(per_thread_memory_budget > 1_000);
let table_size_limit: usize = per_thread_memory_budget / 3;
if let Some(limit) = (1..)
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
.last()
{
limit.min(19) // we cap it at 2^19 = 512K.
} else {
unreachable!(
"Per thread memory is too small: {}",
per_thread_memory_budget
);
}
}
// Group of operations.
// Most of the time, users will send operation one-by-one, but it can be useful to
// send them as a small block to ensure that
// - all docs in the operation will happen on the same segment and continuous docids.
// - all operations in the group are committed at the same time, making the group
// atomic.
type OperationGroup = SmallVec<[AddOperation; 4]>;
type OperationSender = channel::Sender<OperationGroup>;
type OperationReceiver = channel::Receiver<OperationGroup>;
/// `IndexWriter` is the user entry-point to add document to an index.
///
@@ -95,85 +82,13 @@ pub struct IndexWriter {
num_threads: usize,
generation: usize,
delete_queue: DeleteQueue,
stamper: Stamper,
committed_opstamp: Opstamp,
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// `num_threads` specifies the number of indexing workers that
/// should work at the same time.
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// # Panics
/// If the heap size per thread is too small, panics.
pub fn open_index_writer(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_MIN
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
heap_size_in_bytes_per_thread,
index: index.clone(),
operation_receiver: document_receiver,
operation_sender: document_sender,
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
committed_opstamp: current_opstamp,
stamper,
generation: 0,
worker_id: 0,
};
index_writer.start_workers()?;
Ok(index_writer)
}
pub fn compute_deleted_bitset(
fn compute_deleted_bitset(
delete_bitset: &mut BitSet,
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
@@ -181,35 +96,30 @@ pub fn compute_deleted_bitset(
target_opstamp: Opstamp,
) -> Result<bool> {
let mut might_have_changed = false;
#[cfg_attr(feature = "cargo-clippy", allow(clippy::while_let_loop))]
loop {
if let Some(delete_op) = delete_cursor.get() {
if delete_op.opstamp > target_opstamp {
break;
} else {
// A delete operation should only affect
// document that were inserted after it.
//
// Limit doc helps identify the first document
// that may be affected by the delete operation.
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
let inverted_index = segment_reader.inverted_index(delete_op.term.field());
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)
{
while docset.advance() {
let deleted_doc = docset.doc();
if deleted_doc < limit_doc {
delete_bitset.insert(deleted_doc as usize);
might_have_changed = true;
}
}
}
}
} else {
while let Some(delete_op) = delete_cursor.get() {
if delete_op.opstamp > target_opstamp {
break;
}
// A delete operation should only affect
// document that were inserted after it.
//
// Limit doc helps identify the first document
// that may be affected by the delete operation.
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
let inverted_index = segment_reader.inverted_index(delete_op.term.field());
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)
{
while docset.advance() {
let deleted_doc = docset.doc();
if deleted_doc < limit_doc {
delete_bitset.insert(deleted_doc as usize);
might_have_changed = true;
}
}
}
delete_cursor.advance();
}
Ok(might_have_changed)
@@ -217,7 +127,7 @@ pub fn compute_deleted_bitset(
/// Advance delete for the given segment up
/// to the target opstamp.
pub fn advance_deletes(
pub(crate) fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
@@ -229,8 +139,8 @@ pub fn advance_deletes(
}
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
let max_doc = segment_reader.max_doc();
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
None => BitSet::with_capacity(max_doc as usize),
@@ -267,17 +177,15 @@ pub fn advance_deletes(
fn index_documents(
memory_budget: usize,
segment: &Segment,
generation: usize,
document_iterator: &mut dyn Iterator<Item = Vec<AddOperation>>,
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> Result<bool> {
let schema = segment.schema();
let segment_id = segment.id();
let table_size = initial_table_size(memory_budget);
let mut segment_writer = SegmentWriter::for_segment(table_size, segment.clone(), &schema)?;
for documents in document_iterator {
for doc in documents {
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
for document_group in grouped_document_iterator {
for doc in document_group {
segment_writer.add_document(doc, &schema)?;
}
let mem_usage = segment_writer.mem_usage();
@@ -301,37 +209,117 @@ fn index_documents(
assert!(num_docs > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let segment_meta = SegmentMeta::new(segment_id, num_docs);
let segment_meta = segment
.index()
.inventory()
.new_segment_meta(segment_id, num_docs);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let segment_reader = SegmentReader::open(segment)?;
let mut deleted_bitset = BitSet::with_capacity(num_docs as usize);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
&mut delete_cursor,
&doc_to_opstamps,
last_docstamp,
)?;
if may_have_deletes {
Some(deleted_bitset)
} else {
None
}
} else {
let delete_bitset_opt =
apply_deletes(&segment, &mut delete_cursor, &doc_opstamps, last_docstamp)?;
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
Ok(segment_updater.add_segment(segment_entry))
}
fn apply_deletes(
segment: &Segment,
mut delete_cursor: &mut DeleteCursor,
doc_opstamps: &[Opstamp],
last_docstamp: Opstamp,
) -> Result<Option<BitSet<u32>>> {
if delete_cursor.get().is_none() {
// if there are no delete operation in the queue, no need
// to even open the segment.
return Ok(None);
}
let segment_reader = SegmentReader::open(segment)?;
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let mut deleted_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
&mut delete_cursor,
&doc_to_opstamps,
last_docstamp,
)?;
Ok(if may_have_deletes {
Some(deleted_bitset)
} else {
None
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
Ok(segment_updater.add_segment(generation, segment_entry))
})
}
impl IndexWriter {
/// Create a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// `num_threads` specifies the number of indexing workers that
/// should work at the same time.
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// # Panics
/// If the heap size per thread is too small, panics.
pub(crate) fn new(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_MIN
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
heap_size_in_bytes_per_thread,
index: index.clone(),
operation_receiver: document_receiver,
operation_sender: document_sender,
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
committed_opstamp: current_opstamp,
stamper,
worker_id: 0,
};
index_writer.start_workers()?;
Ok(index_writer)
}
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> Result<()> {
@@ -366,8 +354,7 @@ impl IndexWriter {
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
self.segment_updater
.add_segment(self.generation, segment_entry);
self.segment_updater.add_segment(segment_entry);
}
/// Creates a new segment.
@@ -388,17 +375,12 @@ impl IndexWriter {
let document_receiver_clone = self.operation_receiver.clone();
let mut segment_updater = self.segment_updater.clone();
let generation = self.generation;
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<Result<()>> = thread::Builder::new()
.name(format!(
"thrd-tantivy-index{}-gen{}",
self.worker_id, generation
))
.name(format!("thrd-tantivy-index{}", self.worker_id))
.spawn(move || {
loop {
let mut document_iterator =
@@ -427,7 +409,6 @@ impl IndexWriter {
index_documents(
mem_budget,
&segment,
generation,
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),
@@ -459,7 +440,7 @@ impl IndexWriter {
/// Detects and removes the files that
/// are not used by the index anymore.
pub fn garbage_collect_files(&mut self) -> Result<()> {
self.segment_updater.garbage_collect_files()
self.segment_updater.garbage_collect_files().wait()
}
/// Deletes all documents from the index
@@ -559,7 +540,7 @@ impl IndexWriter {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer: IndexWriter = open_index_writer(
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.heap_size_in_bytes_per_thread,
@@ -577,7 +558,7 @@ impl IndexWriter {
//
// This will reach an end as the only document_sender
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
for _ in document_receiver {}
Ok(self.committed_opstamp)
}
@@ -608,10 +589,10 @@ impl IndexWriter {
// all of the segment update for this commit have been
// sent.
//
// No document belonging to the next generation have been
// No document belonging to the next commit have been
// pushed too, because add_document can only happen
// on this thread.
//
// This will move uncommitted segments to the state of
// committed segments.
info!("Preparing commit");
@@ -627,7 +608,6 @@ impl IndexWriter {
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
// add a new worker for the next generation.
self.add_indexing_worker()?;
}
@@ -698,7 +678,7 @@ impl IndexWriter {
pub fn add_document(&self, document: Document) -> Opstamp {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]);
let send_result = self.operation_sender.send(smallvec![add_operation]);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
}
@@ -745,7 +725,7 @@ impl IndexWriter {
}
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
let mut adds: Vec<AddOperation> = Vec::new();
let mut adds = OperationGroup::default();
for (user_op, opstamp) in user_operations.into_iter().zip(stamps) {
match user_op {
@@ -772,7 +752,6 @@ impl IndexWriter {
mod tests {
use super::super::operation::UserOperation;
use super::initial_table_size;
use crate::collector::TopDocs;
use crate::directory::error::LockError;
use crate::error::*;
@@ -782,6 +761,7 @@ mod tests {
use crate::Index;
use crate::ReloadPolicy;
use crate::Term;
use fail;
#[test]
fn test_operations_group() {
@@ -1064,41 +1044,6 @@ mod tests {
assert_eq!(num_docs_containing("b"), 100);
}
#[test]
fn test_hashmap_size() {
assert_eq!(initial_table_size(100_000), 11);
assert_eq!(initial_table_size(1_000_000), 14);
assert_eq!(initial_table_size(10_000_000), 17);
assert_eq!(initial_table_size(1_000_000_000), 19);
}
#[cfg(not(feature = "no_fail"))]
#[test]
fn test_write_commit_fails() {
use fail;
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "a"));
}
index_writer.commit().unwrap();
fail::cfg("RAMDirectory::atomic_write", "return(error_write_failed)").unwrap();
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "b"));
}
assert!(index_writer.commit().is_err());
let num_docs_containing = |s: &str| {
let term_a = Term::from_field_text(text_field, s);
index.reader().unwrap().searcher().doc_freq(&term_a)
};
assert_eq!(num_docs_containing("a"), 100);
assert_eq!(num_docs_containing("b"), 0);
fail::cfg("RAMDirectory::atomic_write", "off").unwrap();
}
#[test]
fn test_add_then_delete_all_documents() {
let mut schema_builder = schema::Schema::builder();

View File

@@ -95,8 +95,11 @@ impl Default for LogMergePolicy {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{SegmentId, SegmentMeta};
use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::merge_policy::MergePolicy;
use once_cell::sync::Lazy;
static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);
fn test_merge_policy() -> LogMergePolicy {
let mut log_merge_policy = LogMergePolicy::default();
@@ -113,7 +116,7 @@ mod tests {
}
fn create_random_segment_meta(num_docs: u32) -> SegmentMeta {
SegmentMeta::new(SegmentId::generate_random(), num_docs)
INVENTORY.new_segment_meta(SegmentId::generate_random(), num_docs)
}
#[test]

View File

@@ -207,6 +207,7 @@ impl IndexMerger {
}
FieldType::U64(ref options)
| FieldType::I64(ref options)
| FieldType::F64(ref options)
| FieldType::Date(ref options) => match options.get_fastfield_cardinality() {
Some(Cardinality::SingleValue) => {
self.write_single_fast_field(field, fast_field_serializer)?;
@@ -692,7 +693,7 @@ impl SerializableSegment for IndexMerger {
#[cfg(test)]
mod tests {
use crate::collector::tests::TestCollector;
use crate::collector::tests::TEST_COLLECTOR_WITH_SCORE;
use crate::collector::tests::{BytesFastFieldTestCollector, FastFieldTestCollector};
use crate::collector::{Count, FacetCollector};
use crate::core::Index;
@@ -807,7 +808,7 @@ mod tests {
let searcher = reader.searcher();
let get_doc_ids = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);
let top_docs = searcher.search(&query, &TestCollector).unwrap();
let top_docs = searcher.search(&query, &TEST_COLLECTOR_WITH_SCORE).unwrap();
top_docs.docs().to_vec()
};
{

View File

@@ -1,14 +1,12 @@
use super::segment_register::SegmentRegister;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::META_FILEPATH;
use crate::error::TantivyError;
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::SegmentEntry;
use crate::Result as TantivyResult;
use std::collections::hash_set::HashSet;
use std::fmt::{self, Debug, Formatter};
use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
@@ -75,19 +73,6 @@ impl SegmentManager {
segment_entries
}
/// List the files that are useful to the index.
///
/// This does not include lock files, or files that are obsolete
/// but have not yet been deleted by the garbage collector.
pub fn list_files(&self) -> HashSet<PathBuf> {
let mut files = HashSet::new();
files.insert(META_FILEPATH.to_path_buf());
for segment_meta in SegmentMeta::all() {
files.extend(segment_meta.list_files());
}
files
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
// and the operations cannot panic.

View File

@@ -93,8 +93,7 @@ impl SegmentRegister {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::{SegmentId, SegmentMetaInventory};
use crate::indexer::delete_queue::*;
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
@@ -107,6 +106,7 @@ mod tests {
#[test]
fn test_segment_register() {
let inventory = SegmentMetaInventory::default();
let delete_queue = DeleteQueue::new();
let mut segment_register = SegmentRegister::default();
@@ -115,20 +115,20 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}

View File

@@ -33,6 +33,7 @@ use std::collections::HashSet;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
@@ -125,7 +126,9 @@ fn perform_merge(
let num_docs = merger.write(segment_serializer)?;
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
let segment_meta = index
.inventory()
.new_segment_meta(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
Ok(after_merge_segment_entry)
@@ -145,7 +148,6 @@ struct InnerSegmentUpdater {
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<()>>>>,
generation: AtomicUsize,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
@@ -172,7 +174,6 @@ impl SegmentUpdater {
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
generation: AtomicUsize::default(),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
@@ -200,18 +201,14 @@ impl SegmentUpdater {
self.0.pool.spawn_fn(move || Ok(f(me_clone)))
}
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> bool {
if generation >= self.0.generation.load(Ordering::Acquire) {
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
})
.forget();
pub fn add_segment(&self, segment_entry: SegmentEntry) -> bool {
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
} else {
false
}
})
.forget();
true
}
/// Orders `SegmentManager` to remove all segments
@@ -272,19 +269,29 @@ impl SegmentUpdater {
}
}
pub fn garbage_collect_files(&self) -> Result<()> {
pub fn garbage_collect_files(&self) -> CpuFuture<(), TantivyError> {
self.run_async(move |segment_updater| {
segment_updater.garbage_collect_files_exec();
})
.wait()
}
/// List the files that are useful to the index.
///
/// This does not include lock files, or files that are obsolete
/// but have not yet been deleted by the garbage collector.
fn list_files(&self) -> HashSet<PathBuf> {
let mut files = HashSet::new();
files.insert(META_FILEPATH.to_path_buf());
for segment_meta in self.0.index.inventory().all() {
files.extend(segment_meta.list_files());
}
files
}
fn garbage_collect_files_exec(&self) {
info!("Running garbage collection");
let mut index = self.0.index.clone();
index
.directory_mut()
.garbage_collect(|| self.0.segment_manager.list_files());
index.directory_mut().garbage_collect(|| self.list_files());
}
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {

View File

@@ -4,6 +4,7 @@ use crate::core::SerializableSegment;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::FieldNormsWriter;
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::compute_table_size;
use crate::postings::MultiFieldPostingsWriter;
use crate::schema::FieldEntry;
use crate::schema::FieldType;
@@ -16,9 +17,26 @@ use crate::tokenizer::{TokenStream, Tokenizer};
use crate::DocId;
use crate::Opstamp;
use crate::Result;
use crate::TantivyError;
use std::io;
use std::str;
/// Computes the initial size of the hash table.
///
/// Returns a number of bit `b`, such that the recommended initial table size is 2^b.
fn initial_table_size(per_thread_memory_budget: usize) -> Result<usize> {
let table_memory_upper_bound = per_thread_memory_budget / 3;
if let Some(limit) = (10..)
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_memory_upper_bound)
.last()
{
Ok(limit.min(19)) // we cap it at 2^19 = 512K.
} else {
Err(TantivyError::InvalidArgument(
format!("per thread memory budget (={}) is too small. Raise the memory budget or lower the number of threads.", per_thread_memory_budget)))
}
}
/// A `SegmentWriter` is in charge of creating segment index from a
/// set of documents.
///
@@ -45,12 +63,13 @@ impl SegmentWriter {
/// - segment: The segment being written
/// - schema
pub fn for_segment(
table_bits: usize,
memory_budget: usize,
mut segment: Segment,
schema: &Schema,
) -> Result<SegmentWriter> {
let table_num_bits = initial_table_size(memory_budget)?;
let segment_serializer = SegmentSerializer::for_segment(&mut segment)?;
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_bits);
let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits);
let tokenizers =
schema
.fields()
@@ -195,6 +214,17 @@ impl SegmentWriter {
}
}
}
FieldType::F64(ref int_option) => {
if int_option.is_indexed() {
for field_value in field_values {
let term = Term::from_field_f64(
field_value.field(),
field_value.value().f64_value(),
);
self.multifield_postings.subscribe(doc_id, &term);
}
}
}
FieldType::Bytes => {
// Do nothing. Bytes only supports fast fields.
}
@@ -254,3 +284,17 @@ impl SerializableSegment for SegmentWriter {
Ok(max_doc)
}
}
#[cfg(test)]
mod tests {
use super::initial_table_size;
#[test]
fn test_hashmap_size() {
assert_eq!(initial_table_size(100_000).unwrap(), 11);
assert_eq!(initial_table_size(1_000_000).unwrap(), 14);
assert_eq!(initial_table_size(10_000_000).unwrap(), 17);
assert_eq!(initial_table_size(1_000_000_000).unwrap(), 19);
}
}

View File

@@ -105,8 +105,8 @@
//!
//! A good place for you to get started is to check out
//! the example code (
//! [literate programming](http://fulmicoton.com/tantivy-examples/simple_search.html) /
//! [source code](https://github.com/fulmicoton/tantivy/blob/master/examples/simple_search.rs))
//! [literate programming](https://tantivy-search.github.io/examples/basic_search.html) /
//! [source code](https://github.com/tantivy-search/tantivy/blob/master/examples/basic_search.rs))
#[macro_use]
extern crate serde_derive;
@@ -171,16 +171,16 @@ pub use self::snippet::{Snippet, SnippetGenerator};
mod docset;
pub use self::docset::{DocSet, SkipResult};
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::SegmentComponent;
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
pub use crate::indexer::IndexWriter;
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term};
pub use crate::common::{i64_to_u64, u64_to_i64};
/// Expose the current version of tantivy, as well
/// whether it was compiled with the simd compression.
pub fn version() -> &'static str {
@@ -250,7 +250,7 @@ pub struct DocAddress(pub SegmentLocalId, pub DocId);
#[cfg(test)]
mod tests {
use crate::collector::tests::TestCollector;
use crate::collector::tests::TEST_COLLECTOR_WITH_SCORE;
use crate::core::SegmentReader;
use crate::docset::DocSet;
use crate::query::BooleanQuery;
@@ -625,6 +625,30 @@ mod tests {
assert!(!postings.advance());
}
#[test]
fn test_indexed_f64() {
let mut schema_builder = Schema::builder();
let value_field = schema_builder.add_f64_field("value", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let val = std::f64::consts::PI;
index_writer.add_document(doc!(value_field => val));
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let term = Term::from_field_f64(value_field, val);
let mut postings = searcher
.segment_reader(0)
.inverted_index(term.field())
.read_postings(&term, IndexRecordOption::Basic)
.unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 0);
assert!(!postings.advance());
}
#[test]
fn test_indexedfield_not_in_documents() {
let mut schema_builder = Schema::builder();
@@ -737,7 +761,7 @@ mod tests {
let searcher = reader.searcher();
let get_doc_ids = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);
let topdocs = searcher.search(&query, &TestCollector).unwrap();
let topdocs = searcher.search(&query, &TEST_COLLECTOR_WITH_SCORE).unwrap();
topdocs.docs().to_vec()
};
assert_eq!(
@@ -817,6 +841,7 @@ mod tests {
let mut schema_builder = Schema::builder();
let fast_field_unsigned = schema_builder.add_u64_field("unsigned", FAST);
let fast_field_signed = schema_builder.add_i64_field("signed", FAST);
let fast_field_float = schema_builder.add_f64_field("float", FAST);
let text_field = schema_builder.add_text_field("text", TEXT);
let stored_int_field = schema_builder.add_u64_field("text", STORED);
let schema = schema_builder.build();
@@ -824,7 +849,8 @@ mod tests {
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 50_000_000).unwrap();
{
let document = doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64);
let document =
doc!(fast_field_unsigned => 4u64, fast_field_signed=>4i64, fast_field_float=>4f64);
index_writer.add_document(document);
index_writer.commit().unwrap();
}
@@ -844,10 +870,14 @@ mod tests {
assert!(fast_field_reader_opt.is_none());
}
{
let fast_field_reader_opt = segment_reader.fast_fields().i64(fast_field_signed);
let fast_field_reader_opt = segment_reader.fast_fields().u64(fast_field_float);
assert!(fast_field_reader_opt.is_none());
}
{
let fast_field_reader_opt = segment_reader.fast_fields().u64(fast_field_unsigned);
assert!(fast_field_reader_opt.is_some());
let fast_field_reader = fast_field_reader_opt.unwrap();
assert_eq!(fast_field_reader.get(0), 4i64)
assert_eq!(fast_field_reader.get(0), 4u64)
}
{
@@ -856,5 +886,12 @@ mod tests {
let fast_field_reader = fast_field_reader_opt.unwrap();
assert_eq!(fast_field_reader.get(0), 4i64)
}
{
let fast_field_reader_opt = segment_reader.fast_fields().f64(fast_field_float);
assert!(fast_field_reader_opt.is_some());
let fast_field_reader = fast_field_reader_opt.unwrap();
assert_eq!(fast_field_reader.get(0), 4f64)
}
}
}

View File

@@ -220,7 +220,7 @@ pub mod tests {
{
let mut segment_writer =
SegmentWriter::for_segment(18, segment.clone(), &schema).unwrap();
SegmentWriter::for_segment(3_000_000, segment.clone(), &schema).unwrap();
{
let mut doc = Document::default();
// checking that position works if the field has two values

View File

@@ -35,6 +35,7 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter>
.unwrap_or_else(|| SpecializedPostingsWriter::<NothingRecorder>::new_boxed()),
FieldType::U64(_)
| FieldType::I64(_)
| FieldType::F64(_)
| FieldType::Date(_)
| FieldType::HierarchicalFacet => SpecializedPostingsWriter::<NothingRecorder>::new_boxed(),
FieldType::Bytes => {
@@ -154,7 +155,7 @@ impl MultiFieldPostingsWriter {
.collect();
unordered_term_mappings.insert(field, mapping);
}
FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => {}
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {}
FieldType::Bytes => {}
}

View File

@@ -7,7 +7,7 @@ pub use self::boolean_query::BooleanQuery;
mod tests {
use super::*;
use crate::collector::tests::TestCollector;
use crate::collector::tests::TEST_COLLECTOR_WITH_SCORE;
use crate::query::score_combiner::SumWithCoordsCombiner;
use crate::query::term_query::TermScorer;
use crate::query::Intersection;
@@ -134,7 +134,7 @@ mod tests {
let matching_docs = |boolean_query: &dyn Query| {
reader
.searcher()
.search(boolean_query, &TestCollector)
.search(boolean_query, &TEST_COLLECTOR_WITH_SCORE)
.unwrap()
.docs()
.iter()
@@ -195,7 +195,7 @@ mod tests {
let score_docs = |boolean_query: &dyn Query| {
let fruit = reader
.searcher()
.search(boolean_query, &TestCollector)
.search(boolean_query, &TEST_COLLECTOR_WITH_SCORE)
.unwrap();
fruit.scores().to_vec()
};

View File

@@ -91,7 +91,6 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> Intersection<TDocSet, TOtherDocSet>
}
impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOtherDocSet> {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::never_loop))]
fn advance(&mut self) -> bool {
let (left, right) = (&mut self.left, &mut self.right);

View File

@@ -10,13 +10,13 @@ pub use self::phrase_weight::PhraseWeight;
mod tests {
use super::*;
use crate::collector::tests::TestCollector;
use crate::collector::tests::{TEST_COLLECTOR_WITHOUT_SCORE, TEST_COLLECTOR_WITH_SCORE};
use crate::core::Index;
use crate::error::TantivyError;
use crate::schema::{Schema, Term, TEXT};
use crate::tests::assert_nearly_equals;
use crate::DocAddress;
use crate::DocId;
use crate::{DocAddress, DocSet};
fn create_index(texts: &[&'static str]) -> Index {
let mut schema_builder = Schema::builder();
@@ -53,7 +53,7 @@ mod tests {
.collect();
let phrase_query = PhraseQuery::new(terms);
let test_fruits = searcher
.search(&phrase_query, &TestCollector)
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed");
test_fruits
.docs()
@@ -68,6 +68,64 @@ mod tests {
assert!(test_query(vec!["g", "a"]).is_empty());
}
#[test]
pub fn test_phrase_query_no_score() {
let index = create_index(&[
"b b b d c g c",
"a b b d c g c",
"a b a b c",
"c a b a d ga a",
"a b c",
]);
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec<Term> = texts
.iter()
.map(|text| Term::from_field_text(text_field, text))
.collect();
let phrase_query = PhraseQuery::new(terms);
let test_fruits = searcher
.search(&phrase_query, &TEST_COLLECTOR_WITHOUT_SCORE)
.expect("search should succeed");
test_fruits
.docs()
.iter()
.map(|docaddr| docaddr.1)
.collect::<Vec<_>>()
};
assert_eq!(test_query(vec!["a", "b", "c"]), vec![2, 4]);
assert_eq!(test_query(vec!["a", "b"]), vec![1, 2, 3, 4]);
assert_eq!(test_query(vec!["b", "b"]), vec![0, 1]);
assert!(test_query(vec!["g", "ewrwer"]).is_empty());
assert!(test_query(vec!["g", "a"]).is_empty());
}
#[test]
pub fn test_phrase_count() {
let index = create_index(&["a c", "a a b d a b c", " a b"]);
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader().unwrap().searcher();
let phrase_query = PhraseQuery::new(vec![
Term::from_field_text(text_field, "a"),
Term::from_field_text(text_field, "b"),
]);
let phrase_weight = phrase_query.phrase_weight(&searcher, true).unwrap();
let mut phrase_scorer = phrase_weight
.phrase_scorer(searcher.segment_reader(0u32))
.unwrap()
.unwrap();
assert!(phrase_scorer.advance());
assert_eq!(phrase_scorer.doc(), 1);
assert_eq!(phrase_scorer.phrase_count(), 2);
assert!(phrase_scorer.advance());
assert_eq!(phrase_scorer.doc(), 2);
assert_eq!(phrase_scorer.phrase_count(), 1);
assert!(!phrase_scorer.advance());
}
#[test]
pub fn test_phrase_query_no_positions() {
let mut schema_builder = Schema::builder();
@@ -93,17 +151,20 @@ mod tests {
Term::from_field_text(text_field, "a"),
Term::from_field_text(text_field, "b"),
]);
if let TantivyError::SchemaError(ref msg) = searcher
.search(&phrase_query, &TestCollector)
match searcher
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.map(|_| ())
.unwrap_err()
{
assert_eq!(
"Applied phrase query on field \"text\", which does not have positions indexed",
msg.as_str()
);
} else {
panic!("Should have returned an error");
TantivyError::SchemaError(ref msg) => {
assert_eq!(
"Applied phrase query on field \"text\", which does not have positions indexed",
msg.as_str()
);
}
_ => {
panic!("Should have returned an error");
}
}
}
@@ -120,7 +181,7 @@ mod tests {
.collect();
let phrase_query = PhraseQuery::new(terms);
searcher
.search(&phrase_query, &TestCollector)
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.scores()
.to_vec()
@@ -152,7 +213,7 @@ mod tests {
.collect();
let phrase_query = PhraseQuery::new(terms);
searcher
.search(&phrase_query, &TestCollector)
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.docs()
.to_vec()
@@ -180,7 +241,7 @@ mod tests {
.collect();
let phrase_query = PhraseQuery::new_with_offset(terms);
searcher
.search(&phrase_query, &TestCollector)
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.docs()
.iter()

View File

@@ -72,13 +72,16 @@ impl PhraseQuery {
.map(|(_, term)| term.clone())
.collect::<Vec<Term>>()
}
}
impl Query for PhraseQuery {
/// Create the weight associated to a query.
/// Returns the `PhraseWeight` for the given phrase query given a specific `searcher`.
///
/// See [`Weight`](./trait.Weight.html).
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<dyn Weight>> {
/// This function is the same as `.weight(...)` except it returns
/// a specialized type `PhraseWeight` instead of a Boxed trait.
pub(crate) fn phrase_weight(
&self,
searcher: &Searcher,
scoring_enabled: bool,
) -> Result<PhraseWeight> {
let schema = searcher.schema();
let field_entry = schema.get_field_entry(self.field);
let has_positions = field_entry
@@ -95,9 +98,20 @@ impl Query for PhraseQuery {
}
let terms = self.phrase_terms();
let bm25_weight = BM25Weight::for_terms(searcher, &terms);
Ok(PhraseWeight::new(
self.phrase_terms.clone(),
bm25_weight,
scoring_enabled,
))
}
}
let phrase_weight: PhraseWeight =
PhraseWeight::new(self.phrase_terms.clone(), bm25_weight, scoring_enabled);
impl Query for PhraseQuery {
/// Create the weight associated to a query.
///
/// See [`Weight`](./trait.Weight.html).
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<dyn Weight>> {
let phrase_weight = self.phrase_weight(searcher, scoring_enabled)?;
Ok(Box::new(phrase_weight))
}

View File

@@ -163,11 +163,9 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
}
fn phrase_exists(&mut self) -> bool {
{
self.intersection_docset
.docset_mut_specialized(0)
.positions(&mut self.left);
}
self.intersection_docset
.docset_mut_specialized(0)
.positions(&mut self.left);
let mut intersection_len = self.left.len();
for i in 1..self.num_terms - 1 {
{

View File

@@ -37,7 +37,7 @@ impl PhraseWeight {
reader.get_fieldnorms_reader(field)
}
fn phrase_scorer(
pub fn phrase_scorer(
&self,
reader: &SegmentReader,
) -> Result<Option<PhraseScorer<SegmentPostings>>> {

View File

@@ -1,6 +1,3 @@
#![cfg_attr(feature = "cargo-clippy", allow(clippy::unneeded_field_pattern))]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::toplevel_ref_arg))]
use super::query_grammar;
use super::user_input_ast::*;
use crate::query::occur::Occur;
@@ -23,7 +20,7 @@ parser! {
parser! {
fn word[I]()(I) -> String
where [I: Stream<Item = char>] {
many1(satisfy(char::is_alphanumeric))
many1(satisfy(|c: char| c.is_alphanumeric() || c=='.'))
.and_then(|s: String| {
match s.as_str() {
"OR" => Err(StreamErrorFor::<I>::unexpected_static_message("OR")),
@@ -269,6 +266,7 @@ mod test {
test_parse_query_to_ast_helper("(+a)", "+(\"a\")");
test_parse_query_to_ast_helper("(+a +b)", "(+(\"a\") +(\"b\"))");
test_parse_query_to_ast_helper("abc:toto", "abc:\"toto\"");
test_parse_query_to_ast_helper("abc:1.1", "abc:\"1.1\"");
test_parse_query_to_ast_helper("+abc:toto", "+(abc:\"toto\")");
test_parse_query_to_ast_helper("(+abc:toto -titi)", "(+(abc:\"toto\") -(\"titi\"))");
test_parse_query_to_ast_helper("-abc:toto", "-(abc:\"toto\")");
@@ -280,6 +278,7 @@ mod test {
test_parse_query_to_ast_helper("foo:[1 TO toto}", "foo:[\"1\" TO \"toto\"}");
test_parse_query_to_ast_helper("foo:[* TO toto}", "foo:[\"*\" TO \"toto\"}");
test_parse_query_to_ast_helper("foo:[1 TO *}", "foo:[\"1\" TO \"*\"}");
test_parse_query_to_ast_helper("foo:[1.1 TO *}", "foo:[\"1.1\" TO \"*\"}");
test_is_parse_err("abc + ");
}
}

View File

@@ -18,39 +18,56 @@ use crate::schema::{FieldType, Term};
use crate::tokenizer::TokenizerManager;
use combine::Parser;
use std::borrow::Cow;
use std::num::ParseIntError;
use std::num::{ParseFloatError, ParseIntError};
use std::ops::Bound;
use std::str::FromStr;
/// Possible error that may happen when parsing a query.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Fail)]
pub enum QueryParserError {
/// Error in the query syntax
#[fail(display = "Syntax Error")]
SyntaxError,
/// `FieldDoesNotExist(field_name: String)`
/// The query references a field that is not in the schema
#[fail(display = "File does not exists: '{:?}'", _0)]
FieldDoesNotExist(String),
/// The query contains a term for a `u64`-field, but the value
/// is not a u64.
/// The query contains a term for a `u64` or `i64`-field, but the value
/// is neither.
#[fail(display = "Expected a valid integer: '{:?}'", _0)]
ExpectedInt(ParseIntError),
/// The query contains a term for a `f64`-field, but the value
/// is not a f64.
#[fail(display = "Invalid query: Only excluding terms given")]
ExpectedFloat(ParseFloatError),
/// It is forbidden queries that are only "excluding". (e.g. -title:pop)
#[fail(display = "Invalid query: Only excluding terms given")]
AllButQueryForbidden,
/// If no default field is declared, running a query without any
/// field specified is forbbidden.
#[fail(display = "No default field declared and no field specified in query")]
NoDefaultFieldDeclared,
/// The field searched for is not declared
/// as indexed in the schema.
#[fail(display = "The field '{:?}' is not declared as indexed", _0)]
FieldNotIndexed(String),
/// A phrase query was requested for a field that does not
/// have any positions indexed.
#[fail(display = "The field '{:?}' does not have positions indexed", _0)]
FieldDoesNotHavePositionsIndexed(String),
/// The tokenizer for the given field is unknown
/// The two argument strings are the name of the field, the name of the tokenizer
#[fail(
display = "The tokenizer '{:?}' for the field '{:?}' is unknown",
_0, _1
)]
UnknownTokenizer(String, String),
/// The query contains a range query with a phrase as one of the bounds.
/// Only terms can be used as bounds.
#[fail(display = "A range query cannot have a phrase as one of the bounds")]
RangeMustNotHavePhrase,
/// The format for the date field is not RFC 3339 compliant.
#[fail(display = "The date field has an invalid format")]
DateFormatError(chrono::ParseError),
}
@@ -60,6 +77,12 @@ impl From<ParseIntError> for QueryParserError {
}
}
impl From<ParseFloatError> for QueryParserError {
fn from(err: ParseFloatError) -> QueryParserError {
QueryParserError::ExpectedFloat(err)
}
}
impl From<chrono::ParseError> for QueryParserError {
fn from(err: chrono::ParseError) -> QueryParserError {
QueryParserError::DateFormatError(err)
@@ -239,6 +262,11 @@ impl QueryParser {
let term = Term::from_field_i64(field, val);
Ok(vec![(0, term)])
}
FieldType::F64(_) => {
let val: f64 = f64::from_str(phrase)?;
let term = Term::from_field_f64(field, val);
Ok(vec![(0, term)])
}
FieldType::Date(_) => match chrono::DateTime::parse_from_rfc3339(phrase) {
Ok(x) => Ok(vec![(
0,
@@ -529,6 +557,7 @@ mod test {
schema_builder.add_text_field("nottokenized", STRING);
schema_builder.add_text_field("with_stop_words", text_options);
schema_builder.add_date_field("date", INDEXED);
schema_builder.add_f64_field("float", INDEXED);
let schema = schema_builder.build();
let default_fields = vec![title, text];
let tokenizer_manager = TokenizerManager::default();
@@ -634,6 +663,13 @@ mod test {
assert!(query_parser
.parse_query("unsigned:\"18446744073709551615\"")
.is_ok());
assert!(query_parser.parse_query("float:\"3.1\"").is_ok());
assert!(query_parser.parse_query("float:\"-2.4\"").is_ok());
assert!(query_parser.parse_query("float:\"2.1.2\"").is_err());
assert!(query_parser.parse_query("float:\"2.1a\"").is_err());
assert!(query_parser
.parse_query("float:\"18446744073709551615.0\"")
.is_ok());
test_parse_query_to_logical_ast_helper(
"unsigned:2324",
"Term([0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 9, 20])",
@@ -645,6 +681,12 @@ mod test {
&format!("{:?}", Term::from_field_i64(Field(2u32), -2324)),
false,
);
test_parse_query_to_logical_ast_helper(
"float:2.5",
&format!("{:?}", Term::from_field_f64(Field(10u32), 2.5)),
false,
);
}
#[test]
@@ -786,6 +828,11 @@ mod test {
query_parser.parse_query("signed:18b"),
Err(QueryParserError::ExpectedInt(_))
);
assert!(query_parser.parse_query("float:\"1.8\"").is_ok());
assert_matches!(
query_parser.parse_query("float:1.8a"),
Err(QueryParserError::ExpectedFloat(_))
);
}
#[test]

View File

@@ -142,6 +142,39 @@ impl RangeQuery {
}
}
/// Creates a new `RangeQuery` over a `f64` field.
///
/// If the field is not of the type `f64`, tantivy
/// will panic when the `Weight` object is created.
pub fn new_f64(field: Field, range: Range<f64>) -> RangeQuery {
RangeQuery::new_f64_bounds(
field,
Bound::Included(range.start),
Bound::Excluded(range.end),
)
}
/// Create a new `RangeQuery` over a `f64` field.
///
/// The two `Bound` arguments make it possible to create more complex
/// ranges than semi-inclusive range.
///
/// If the field is not of the type `f64`, tantivy
/// will panic when the `Weight` object is created.
pub fn new_f64_bounds(
field: Field,
left_bound: Bound<f64>,
right_bound: Bound<f64>,
) -> RangeQuery {
let make_term_val = |val: &f64| Term::from_field_f64(field, *val).value_bytes().to_owned();
RangeQuery {
field,
value_type: Type::F64,
left_bound: map_bound(&left_bound, &make_term_val),
right_bound: map_bound(&right_bound, &make_term_val),
}
}
/// Create a new `RangeQuery` over a `u64` field.
///
/// The two `Bound` arguments make it possible to create more complex
@@ -397,4 +430,61 @@ mod tests {
);
}
#[test]
fn test_range_float() {
let float_field: Field;
let schema = {
let mut schema_builder = Schema::builder();
float_field = schema_builder.add_f64_field("floatfield", INDEXED);
schema_builder.build()
};
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap();
for i in 1..100 {
let mut doc = Document::new();
for j in 1..100 {
if i % j == 0 {
doc.add_f64(float_field, j as f64);
}
}
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let count_multiples =
|range_query: RangeQuery| searcher.search(&range_query, &Count).unwrap();
assert_eq!(count_multiples(RangeQuery::new_f64(float_field, 10.0..11.0)), 9);
assert_eq!(
count_multiples(RangeQuery::new_f64_bounds(
float_field,
Bound::Included(10.0),
Bound::Included(11.0)
)),
18
);
assert_eq!(
count_multiples(RangeQuery::new_f64_bounds(
float_field,
Bound::Excluded(9.0),
Bound::Included(10.0)
)),
9
);
assert_eq!(
count_multiples(RangeQuery::new_f64_bounds(
float_field,
Bound::Included(9.0),
Bound::Unbounded
)),
91
);
}
}

View File

@@ -1,6 +1,7 @@
mod pool;
use self::pool::{LeasedItem, Pool};
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;
@@ -85,7 +86,10 @@ impl IndexReaderBuilder {
);
}
};
let watch_handle = inner_reader_arc.index.directory().watch(Box::new(callback));
let watch_handle = inner_reader_arc
.index
.directory()
.watch(Box::new(callback))?;
watch_handle_opt = Some(watch_handle);
}
}

View File

@@ -123,6 +123,10 @@ impl<T> Pool<T> {
}
}
/// A LeasedItem holds an object borrowed from a Pool.
///
/// Upon drop, the object is automatically returned
/// into the pool.
pub struct LeasedItem<T> {
gen_item: Option<GenerationItem<T>>,
recycle_queue: Arc<Queue<GenerationItem<T>>>,

View File

@@ -88,6 +88,11 @@ impl Document {
self.add(FieldValue::new(field, Value::I64(value)));
}
/// Add a f64 field
pub fn add_f64(&mut self, field: Field, value: f64) {
self.add(FieldValue::new(field, Value::F64(value)));
}
/// Add a date field
pub fn add_date(&mut self, field: Field, value: &DateTime) {
self.add(FieldValue::new(field, Value::Date(*value)));

View File

@@ -117,6 +117,13 @@ impl Facet {
&& other_str.starts_with(self_str)
&& other_str.as_bytes()[self_str.len()] == FACET_SEP_BYTE
}
/// Extract path from the `Facet`.
pub fn to_path(&self) -> Vec<&str> {
self.encoded_str()
.split(|c| c == FACET_SEP_CHAR)
.collect()
}
}
impl Borrow<str> for Facet {
@@ -254,4 +261,10 @@ mod tests {
assert_eq!(format!("{:?}", facet), "Facet(/first/second/third)");
}
#[test]
fn test_to_path() {
let v = ["first", "second", "third\\/not_fourth"];
let facet = Facet::from_path(v.iter());
assert_eq!(facet.to_path(), v);
}
}

View File

@@ -48,6 +48,15 @@ impl FieldEntry {
}
}
/// Creates a new f64 field entry in the schema, given
/// a name, and some options.
pub fn new_f64(field_name: String, field_type: IntOptions) -> FieldEntry {
FieldEntry {
name: field_name,
field_type: FieldType::F64(field_type),
}
}
/// Creates a new date field entry in the schema, given
/// a name, and some options.
pub fn new_date(field_name: String, field_type: IntOptions) -> FieldEntry {
@@ -89,6 +98,7 @@ impl FieldEntry {
FieldType::Str(ref options) => options.get_indexing_options().is_some(),
FieldType::U64(ref options)
| FieldType::I64(ref options)
| FieldType::F64(ref options)
| FieldType::Date(ref options) => options.is_indexed(),
FieldType::HierarchicalFacet => true,
FieldType::Bytes => false,
@@ -98,7 +108,7 @@ impl FieldEntry {
/// Returns true iff the field is a int (signed or unsigned) fast field
pub fn is_int_fast(&self) -> bool {
match self.field_type {
FieldType::U64(ref options) | FieldType::I64(ref options) => options.is_fast(),
FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::F64(ref options) => options.is_fast(),
_ => false,
}
}
@@ -108,6 +118,7 @@ impl FieldEntry {
match self.field_type {
FieldType::U64(ref options)
| FieldType::I64(ref options)
| FieldType::F64(ref options)
| FieldType::Date(ref options) => options.is_stored(),
FieldType::Str(ref options) => options.is_stored(),
// TODO make stored hierarchical facet optional
@@ -138,6 +149,10 @@ impl Serialize for FieldEntry {
s.serialize_field("type", "i64")?;
s.serialize_field("options", options)?;
}
FieldType::F64(ref options) => {
s.serialize_field("type", "f64")?;
s.serialize_field("options", options)?;
}
FieldType::Date(ref options) => {
s.serialize_field("type", "date")?;
s.serialize_field("options", options)?;
@@ -205,7 +220,7 @@ impl<'de> Deserialize<'de> for FieldEntry {
"bytes" => {
field_type = Some(FieldType::Bytes);
}
"text" | "u64" | "i64" | "date" => {
"text" | "u64" | "i64" | "f64" | "date" => {
// These types require additional options to create a field_type
}
_ => panic!("unhandled type"),
@@ -222,6 +237,7 @@ impl<'de> Deserialize<'de> for FieldEntry {
"text" => field_type = Some(FieldType::Str(map.next_value()?)),
"u64" => field_type = Some(FieldType::U64(map.next_value()?)),
"i64" => field_type = Some(FieldType::I64(map.next_value()?)),
"f64" => field_type = Some(FieldType::F64(map.next_value()?)),
"date" => field_type = Some(FieldType::Date(map.next_value()?)),
_ => {
let msg = format!("Unrecognised type {}", ty);

View File

@@ -35,6 +35,8 @@ pub enum Type {
U64,
/// `i64`
I64,
/// `f64`
F64,
/// `date(i64) timestamp`
Date,
/// `tantivy::schema::Facet`. Passed as a string in JSON.
@@ -53,6 +55,8 @@ pub enum FieldType {
U64(IntOptions),
/// Signed 64-bits integers 64 field type configuration
I64(IntOptions),
/// 64-bits float 64 field type configuration
F64(IntOptions),
/// Signed 64-bits Date 64 field type configuration,
Date(IntOptions),
/// Hierachical Facet
@@ -68,6 +72,7 @@ impl FieldType {
FieldType::Str(_) => Type::Str,
FieldType::U64(_) => Type::U64,
FieldType::I64(_) => Type::I64,
FieldType::F64(_) => Type::F64,
FieldType::Date(_) => Type::Date,
FieldType::HierarchicalFacet => Type::HierarchicalFacet,
FieldType::Bytes => Type::Bytes,
@@ -78,7 +83,7 @@ impl FieldType {
pub fn is_indexed(&self) -> bool {
match *self {
FieldType::Str(ref text_options) => text_options.get_indexing_options().is_some(),
FieldType::U64(ref int_options) | FieldType::I64(ref int_options) => {
FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::F64(ref int_options) => {
int_options.is_indexed()
}
FieldType::Date(ref date_options) => date_options.is_indexed(),
@@ -98,6 +103,7 @@ impl FieldType {
.map(TextFieldIndexing::index_option),
FieldType::U64(ref int_options)
| FieldType::I64(ref int_options)
| FieldType::F64(ref int_options)
| FieldType::Date(ref int_options) => {
if int_options.is_indexed() {
Some(IndexRecordOption::Basic)
@@ -119,7 +125,7 @@ impl FieldType {
match *json {
JsonValue::String(ref field_text) => match *self {
FieldType::Str(_) => Ok(Value::Str(field_text.clone())),
FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => Err(
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => Err(
ValueParsingError::TypeError(format!("Expected an integer, got {:?}", json)),
),
FieldType::HierarchicalFacet => Ok(Value::Facet(Facet::from(field_text))),
@@ -146,6 +152,14 @@ impl FieldType {
let msg = format!("Expected a u64 int, got {:?}", json);
Err(ValueParsingError::OverflowError(msg))
}
},
FieldType::F64(_) => {
if let Some(field_val_f64) = field_val_num.as_f64() {
Ok(Value::F64(field_val_f64))
} else {
let msg = format!("Expected a f64 int, got {:?}", json);
Err(ValueParsingError::OverflowError(msg))
}
}
FieldType::Str(_) | FieldType::HierarchicalFacet | FieldType::Bytes => {
let msg = format!("Expected a string, got {:?}", json);

View File

@@ -22,7 +22,7 @@ pub const STORED: SchemaFlagList<StoredFlag, ()> = SchemaFlagList {
pub struct IndexedFlag;
/// Flag to mark the field as indexed.
///
/// The `INDEXED` flag can only be used when building `IntOptions` (`u64` and `i64` fields)
/// The `INDEXED` flag can only be used when building `IntOptions` (`u64`, `i64` and `f64` fields)
/// Of course, text fields can also be indexed... But this is expressed by using either the
/// `STRING` (untokenized) or `TEXT` (tokenized with the english tokenizer) flags.
pub const INDEXED: SchemaFlagList<IndexedFlag, ()> = SchemaFlagList {
@@ -36,7 +36,7 @@ pub struct FastFlag;
///
/// Fast fields can be random-accessed rapidly. Fields useful for scoring, filtering
/// or collection should be mark as fast fields.
/// The `FAST` flag can only be used when building `IntOptions` (`u64` and `i64` fields)
/// The `FAST` flag can only be used when building `IntOptions` (`u64`, `i64` and `f64` fields)
pub const FAST: SchemaFlagList<FastFlag, ()> = SchemaFlagList {
head: FastFlag,
tail: (),

View File

@@ -54,7 +54,7 @@ On the other hand setting the field as stored or not determines whether the fiel
when [`searcher.doc(doc_address)`](../struct.Searcher.html#method.doc) is called.
## Setting a u64 or a i64 field
## Setting a u64, a i64 or a f64 field
### Example

View File

@@ -82,6 +82,26 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a new f64 field.
/// Returns the associated field handle
///
/// # Caution
///
/// Appending two fields with the same name
/// will result in the shadowing of the first
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_f64_field<T: Into<IntOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
) -> Field {
let field_name = String::from(field_name_str);
let field_entry = FieldEntry::new_f64(field_name, field_options.into());
self.add_field(field_entry)
}
/// Adds a new date field.
/// Returns the associated field handle
/// Internally, Tantivy simply stores dates as i64 UTC timestamps,
@@ -241,6 +261,24 @@ impl Schema {
NamedFieldDocument(field_map)
}
/// Converts a named doc into a document.
pub fn from_named_doc(
&self,
named_doc: NamedFieldDocument,
) -> Result<Document, DocParsingError> {
let mut doc = Document::default();
for (field_name, field_values) in named_doc.0 {
if let Some(field) = self.get_field(&field_name) {
for field_value in field_values {
doc.add(FieldValue::new(field, field_value));
}
} else {
return Err(DocParsingError::NoSuchFieldInSchema(field_name.clone()));
}
}
Ok(doc)
}
/// Encode the schema in JSON.
///
/// Encoding a document cannot fail.
@@ -259,7 +297,6 @@ impl Schema {
};
DocParsingError::NotJSON(doc_json_sample)
})?;
let mut doc = Document::default();
for (field_name, json_value) in json_obj.iter() {
match self.get_field(field_name) {
@@ -340,13 +377,16 @@ impl<'de> Deserialize<'de> for Schema {
/// Error that may happen when deserializing
/// a document from JSON.
#[derive(Debug)]
#[derive(Debug, Fail)]
pub enum DocParsingError {
/// The payload given is not valid JSON.
#[fail(display = "The provided string is not valid JSON")]
NotJSON(String),
/// One of the value node could not be parsed.
#[fail(display = "The field '{:?}' could not be parsed: {:?}", _0, _1)]
ValueError(String, ValueParsingError),
/// The json-document contains a field that is not declared in the schema.
#[fail(display = "The json-document contains an unknown field: {:?}", _0)]
NoSuchFieldInSchema(String),
}
@@ -376,10 +416,14 @@ mod tests {
let popularity_options = IntOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let score_options = IntOptions::default()
.set_indexed()
.set_fast(Cardinality::SingleValue);
schema_builder.add_text_field("title", TEXT);
schema_builder.add_text_field("author", STRING);
schema_builder.add_u64_field("count", count_options);
schema_builder.add_i64_field("popularity", popularity_options);
schema_builder.add_f64_field("score", score_options);
let schema = schema_builder.build();
let schema_json = serde_json::to_string_pretty(&schema).unwrap();
let expected = r#"[
@@ -422,6 +466,15 @@ mod tests {
"fast": "single",
"stored": true
}
},
{
"name": "score",
"type": "f64",
"options": {
"indexed": true,
"fast": "single",
"stored": false
}
}
]"#;
assert_eq!(schema_json, expected);
@@ -434,6 +487,8 @@ mod tests {
assert_eq!("author", fields.next().unwrap().name());
assert_eq!("count", fields.next().unwrap().name());
assert_eq!("popularity", fields.next().unwrap().name());
assert_eq!("score", fields.next().unwrap().name());
assert!(fields.next().is_none());
}
#[test]
@@ -466,10 +521,14 @@ mod tests {
let popularity_options = IntOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let score_options = IntOptions::default()
.set_indexed()
.set_fast(Cardinality::SingleValue);
let title_field = schema_builder.add_text_field("title", TEXT);
let author_field = schema_builder.add_text_field("author", STRING);
let count_field = schema_builder.add_u64_field("count", count_options);
let popularity_field = schema_builder.add_i64_field("popularity", popularity_options);
let score_field = schema_builder.add_f64_field("score", score_options);
let schema = schema_builder.build();
{
let doc = schema.parse_document("{}").unwrap();
@@ -482,7 +541,8 @@ mod tests {
"title": "my title",
"author": "fulmicoton",
"count": 4,
"popularity": 10
"popularity": 10,
"score": 80.5
}"#,
)
.unwrap();
@@ -493,6 +553,7 @@ mod tests {
);
assert_eq!(doc.get_first(count_field).unwrap().u64_value(), 4);
assert_eq!(doc.get_first(popularity_field).unwrap().i64_value(), 10);
assert_eq!(doc.get_first(score_field).unwrap().f64_value(), 80.5);
}
{
let json_err = schema.parse_document(
@@ -501,6 +562,7 @@ mod tests {
"author": "fulmicoton",
"count": 4,
"popularity": 10,
"score": 80.5,
"jambon": "bayonne"
}"#,
);
@@ -513,6 +575,7 @@ mod tests {
"author": "fulmicoton",
"count": "5",
"popularity": "10",
"score": "80.5",
"jambon": "bayonne"
}"#,
);
@@ -527,7 +590,8 @@ mod tests {
"title": "my title",
"author": "fulmicoton",
"count": -5,
"popularity": 10
"popularity": 10,
"score": 80.5
}"#,
);
assert_matches!(
@@ -541,7 +605,8 @@ mod tests {
"title": "my title",
"author": "fulmicoton",
"count": 9223372036854775808,
"popularity": 10
"popularity": 10,
"score": 80.5
}"#,
);
assert!(!matches!(
@@ -555,7 +620,8 @@ mod tests {
"title": "my title",
"author": "fulmicoton",
"count": 50,
"popularity": 9223372036854775808
"popularity": 9223372036854775808,
"score": 80.5
}"#,
);
assert_matches!(

View File

@@ -19,9 +19,9 @@ where
B: AsRef<[u8]>;
impl Term {
/// Builds a term given a field, and a u64-value
/// Builds a term given a field, and a i64-value
///
/// Assuming the term has a field id of 1, and a u64 value of 3234,
/// Assuming the term has a field id of 1, and a i64 value of 3234,
/// the Term will have 8 bytes.
///
/// The first four byte are dedicated to storing the field id as a u64.
@@ -31,6 +31,18 @@ impl Term {
Term::from_field_u64(field, val_u64)
}
/// Builds a term given a field, and a f64-value
///
/// Assuming the term has a field id of 1, and a u64 value of 3234,
/// the Term will have 8 bytes. <= this is wrong
///
/// The first four byte are dedicated to storing the field id as a u64.
/// The 4 following bytes are encoding the u64 value.
pub fn from_field_f64(field: Field, val: f64) -> Term {
let val_u64: u64 = common::f64_to_u64(val);
Term::from_field_u64(field, val_u64)
}
/// Builds a term given a field, and a DateTime value
///
/// Assuming the term has a field id of 1, and a timestamp i64 value of 3234,
@@ -112,6 +124,11 @@ impl Term {
self.set_u64(common::i64_to_u64(val));
}
/// Sets a `f64` value in the term.
pub fn set_f64(&mut self, val: f64) {
self.set_u64(common::f64_to_u64(val));
}
fn set_bytes(&mut self, bytes: &[u8]) {
self.0.resize(4, 0u8);
self.0.extend(bytes);
@@ -161,6 +178,15 @@ where
common::u64_to_i64(BigEndian::read_u64(&self.0.as_ref()[4..]))
}
/// Returns the `f64` value stored in a term.
///
/// # Panics
/// ... or returns an invalid value
/// if the term is not a `i64` field.
pub fn get_f64(&self) -> f64 {
common::u64_to_f64(BigEndian::read_u64(&self.0.as_ref()[4..]))
}
/// Returns the text associated with the term.
///
/// # Panics

View File

@@ -2,11 +2,11 @@ use crate::schema::Facet;
use crate::DateTime;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
use std::{fmt, cmp::Ordering};
/// Value represents the value of a any field.
/// It is an enum over all over all of the possible field type.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub enum Value {
/// The str type is used for any text information.
Str(String),
@@ -14,6 +14,8 @@ pub enum Value {
U64(u64),
/// Signed 64-bits Integer `i64`
I64(i64),
/// 64-bits Float `f64`
F64(f64),
/// Signed 64-bits Date time stamp `date`
Date(DateTime),
/// Hierarchical Facet
@@ -22,6 +24,40 @@ pub enum Value {
Bytes(Vec<u8>),
}
impl Eq for Value {}
impl Ord for Value {
fn cmp(&self, other: &Self) -> Ordering {
match (self,other) {
(Value::Str(l), Value::Str(r)) => l.cmp(r),
(Value::U64(l), Value::U64(r)) => l.cmp(r),
(Value::I64(l), Value::I64(r)) => l.cmp(r),
(Value::Date(l), Value::Date(r)) => l.cmp(r),
(Value::Facet(l), Value::Facet(r)) => l.cmp(r),
(Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
(Value::F64(l), Value::F64(r)) => {
match (l.is_nan(),r.is_nan()) {
(false, false) => l.partial_cmp(r).unwrap(), // only fail on NaN
(true, true) => Ordering::Equal,
(true, false) => Ordering::Less, // we define NaN as less than -∞
(false, true) => Ordering::Greater,
}
}
(Value::Str(_), _) => Ordering::Less,
(_, Value::Str(_)) => Ordering::Greater,
(Value::U64(_), _) => Ordering::Less,
(_, Value::U64(_)) => Ordering::Greater,
(Value::I64(_), _) => Ordering::Less,
(_, Value::I64(_)) => Ordering::Greater,
(Value::F64(_), _) => Ordering::Less,
(_, Value::F64(_)) => Ordering::Greater,
(Value::Date(_), _) => Ordering::Less,
(_, Value::Date(_)) => Ordering::Greater,
(Value::Facet(_), _) => Ordering::Less,
(_, Value::Facet(_)) => Ordering::Greater,
}
}
}
impl Serialize for Value {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -31,6 +67,7 @@ impl Serialize for Value {
Value::Str(ref v) => serializer.serialize_str(v),
Value::U64(u) => serializer.serialize_u64(u),
Value::I64(u) => serializer.serialize_i64(u),
Value::F64(u) => serializer.serialize_f64(u),
Value::Date(ref date) => serializer.serialize_i64(date.timestamp()),
Value::Facet(ref facet) => facet.serialize(serializer),
Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes),
@@ -60,6 +97,10 @@ impl<'de> Deserialize<'de> for Value {
Ok(Value::I64(v))
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> {
Ok(Value::F64(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> {
Ok(Value::Str(v.to_owned()))
}
@@ -75,9 +116,7 @@ impl<'de> Deserialize<'de> for Value {
impl Value {
/// Returns the text value, provided the value is of the `Str` type.
///
/// # Panics
/// If the value is not of type `Str`
/// (Returns None if the value is not of the `Str` type).
pub fn text(&self) -> Option<&str> {
match *self {
Value::Str(ref text) => Some(text),
@@ -92,7 +131,7 @@ impl Value {
pub fn u64_value(&self) -> u64 {
match *self {
Value::U64(ref value) => *value,
_ => panic!("This is not a text field."),
_ => panic!("This is not a u64 field."),
}
}
@@ -103,10 +142,21 @@ impl Value {
pub fn i64_value(&self) -> i64 {
match *self {
Value::I64(ref value) => *value,
_ => panic!("This is not a text field."),
_ => panic!("This is not a i64 field."),
}
}
/// Returns the f64-value, provided the value is of the `F64` type.
///
/// # Panics
/// If the value is not of type `F64`
pub fn f64_value(&self) -> f64 {
match *self {
Value::F64(ref value) => *value,
_ => panic!("This is not a f64 field."),
}
}
/// Returns the Date-value, provided the value is of the `Date` type.
///
/// # Panics
@@ -137,6 +187,12 @@ impl From<i64> for Value {
}
}
impl From<f64> for Value {
fn from(v: f64) -> Value {
Value::F64(v)
}
}
impl From<DateTime> for Value {
fn from(date_time: DateTime) -> Value {
Value::Date(date_time)
@@ -163,7 +219,7 @@ impl From<Vec<u8>> for Value {
mod binary_serialize {
use super::Value;
use crate::common::BinarySerializable;
use crate::common::{BinarySerializable, f64_to_u64, u64_to_f64};
use crate::schema::Facet;
use chrono::{TimeZone, Utc};
use std::io::{self, Read, Write};
@@ -174,6 +230,7 @@ mod binary_serialize {
const HIERARCHICAL_FACET_CODE: u8 = 3;
const BYTES_CODE: u8 = 4;
const DATE_CODE: u8 = 5;
const F64_CODE: u8 = 6;
impl BinarySerializable for Value {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
@@ -190,6 +247,10 @@ mod binary_serialize {
I64_CODE.serialize(writer)?;
val.serialize(writer)
}
Value::F64(ref val) => {
F64_CODE.serialize(writer)?;
f64_to_u64(*val).serialize(writer)
}
Value::Date(ref val) => {
DATE_CODE.serialize(writer)?;
val.timestamp().serialize(writer)
@@ -219,6 +280,10 @@ mod binary_serialize {
let value = i64::deserialize(reader)?;
Ok(Value::I64(value))
}
F64_CODE => {
let value = u64_to_f64(u64::deserialize(reader)?);
Ok(Value::F64(value))
}
DATE_CODE => {
let timestamp = i64::deserialize(reader)?;
Ok(Value::Date(Utc.timestamp(timestamp, 0)))

View File

@@ -95,7 +95,6 @@ impl StoreReader {
}
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
fn split_source(data: ReadOnlySource) -> (ReadOnlySource, ReadOnlySource, DocId) {
let data_len = data.len();
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();

View File

@@ -81,19 +81,14 @@ impl<'a> TermMerger<'a> {
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
#[cfg_attr(feature = "cargo-clippy", allow(clippy::while_let_loop))]
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
loop {
if let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
} else {
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
} // no more streamer.
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}

View File

@@ -14,6 +14,9 @@ lexicographical order matches the natural order of integers.
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
and then treated as a `u64`.
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/

View File

@@ -1,5 +1,3 @@
#![cfg_attr(feature = "cargo-clippy", allow(clippy::new_without_default))]
use super::{Token, TokenFilter, TokenStream};
use rust_stemmers::{self, Algorithm};

67
tests/failpoints/mod.rs Normal file
View File

@@ -0,0 +1,67 @@
use fail;
use std::io::Write;
use std::path::Path;
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory};
use tantivy::doc;
use tantivy::schema::{Schema, TEXT};
use tantivy::{Index, Term};
#[test]
fn test_failpoints_managed_directory_gc_if_delete_fails() {
let scenario = fail::FailScenario::setup();
let test_path: &'static Path = Path::new("some_path_for_test");
let ram_directory = RAMDirectory::create();
let mut managed_directory = ManagedDirectory::wrap(ram_directory).unwrap();
managed_directory
.open_write(test_path)
.unwrap()
.flush()
.unwrap();
assert!(managed_directory.exists(test_path));
// triggering gc and setting the delete operation to fail.
//
// We are checking that the gc operation is not removing the
// file from managed.json to ensure that the file will be removed
// in the next gc.
//
// The initial 1*off is there to allow for the removal of the
// lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
managed_directory.garbage_collect(Default::default);
assert!(managed_directory.exists(test_path));
// running the gc a second time should remove the file.
managed_directory.garbage_collect(Default::default);
assert!(
!managed_directory.exists(test_path),
"The file should have been deleted"
);
}
#[test]
fn test_write_commit_fails() {
let _fail_scenario_guard = fail::FailScenario::setup();
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "a"));
}
index_writer.commit().unwrap();
fail::cfg("RAMDirectory::atomic_write", "return(error_write_failed)").unwrap();
for _ in 0..100 {
index_writer.add_document(doc!(text_field => "b"));
}
assert!(index_writer.commit().is_err());
let num_docs_containing = |s: &str| {
let term_a = Term::from_field_text(text_field, s);
index.reader().unwrap().searcher().doc_freq(&term_a)
};
assert_eq!(num_docs_containing("a"), 100);
assert_eq!(num_docs_containing("b"), 0);
}

1
tests/mod.rs Normal file
View File

@@ -0,0 +1 @@
mod failpoints;