diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3bec8fbff..d692de690 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@ Tantivy 0.9.0
=====================
*0.9.0 index format is not compatible with the
previous index format.*
+- MAJOR BUGFIX :
+ Some `Mmap` objects were being leaked, and would never get released. (@fulmicoton)
- Removed most unsafe (@fulmicoton)
- Indexer memory footprint improved. (VInt comp, inlining the first block. (@fulmicoton)
- Stemming in other language possible (@pentlander)
@@ -12,6 +14,7 @@ previous index format.*
- Removed `INT_STORED` and `INT_INDEXED`. It is now possible to use `STORED` and `INDEXED`
for int fields. (@fulmicoton)
- Added DateTime field (@barrotsteindev)
+- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton)
Tantivy 0.8.2
diff --git a/Cargo.toml b/Cargo.toml
index 450f03e35..1e04cf4fa 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -32,6 +32,7 @@ num_cpus = "1.2"
fs2={version="0.4", optional=true}
itertools = "0.8"
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
+notify = {version="4", optional=true}
bit-set = "0.5"
uuid = { version = "0.7.2", features = ["v4", "serde"] }
crossbeam = "0.5"
@@ -73,7 +74,7 @@ overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
-mmap = ["atomicwrites", "fs2", "memmap"]
+mmap = ["atomicwrites", "fs2", "memmap", "notify"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
unstable = [] # useful for benches.
diff --git a/examples/basic_search.rs b/examples/basic_search.rs
index 3a0a71e2f..78c2c2d3b 100644
--- a/examples/basic_search.rs
+++ b/examples/basic_search.rs
@@ -20,6 +20,7 @@ use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::*;
use tantivy::Index;
+use tantivy::ReloadPolicy;
use tempdir::TempDir;
fn main() -> tantivy::Result<()> {
@@ -170,24 +171,33 @@ fn main() -> tantivy::Result<()> {
//
// ### Searcher
//
- // Let's search our index. Start by reloading
- // searchers in the index. This should be done
- // after every `commit()`.
- index.load_searchers()?;
+ // A reader is required to get search the index.
+ // It acts as a `Searcher` pool that reloads itself,
+ // depending on a `ReloadPolicy`.
+ //
+ // For a search server you will typically create one reader for the entire lifetime of your
+ // program, and acquire a new searcher for every single request.
+ //
+ // In the code below, we rely on the 'ON_COMMIT' policy: the reader
+ // will reload the index automatically after each commit.
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::OnCommit)
+ .try_into()?;
// We now need to acquire a searcher.
- // Some search experience might require more than
- // one query.
//
- // The searcher ensure that we get to work
- // with a consistent version of the index.
+ // A searcher points to snapshotted, immutable version of the index.
+ //
+ // Some search experience might require more than
+ // one query. Using the same searcher ensures that all of these queries will run on the
+ // same version of the index.
//
// Acquiring a `searcher` is very cheap.
//
- // You should acquire a searcher every time you
- // start processing a request and
+ // You should acquire a searcher every time you start processing a request and
// and release it right after your query is finished.
- let searcher = index.searcher();
+ let searcher = reader.searcher();
// ### Query
@@ -224,7 +234,6 @@ fn main() -> tantivy::Result<()> {
// Since the body field was not configured as stored,
// the document returned will only contain
// a title.
-
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
println!("{}", schema.to_json(&retrieved_doc));
diff --git a/examples/custom_collector.rs b/examples/custom_collector.rs
index b508d6e7c..40a1d1a0d 100644
--- a/examples/custom_collector.rs
+++ b/examples/custom_collector.rs
@@ -170,9 +170,9 @@ fn main() -> tantivy::Result<()> {
price => 5_200u64
));
index_writer.commit()?;
- index.load_searchers()?;
- let searcher = index.searcher();
+ let reader = index.reader()?;
+ let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![product_name, product_description]);
// here we want to get a hit on the 'ken' in Frankenstein
diff --git a/examples/custom_tokenizer.rs b/examples/custom_tokenizer.rs
index 72b69184d..5730adb10 100644
--- a/examples/custom_tokenizer.rs
+++ b/examples/custom_tokenizer.rs
@@ -91,9 +91,9 @@ fn main() -> tantivy::Result<()> {
increasing confidence in the success of my undertaking."#
));
index_writer.commit()?;
- index.load_searchers()?;
- let searcher = index.searcher();
+ let reader = index.reader()?;
+ let searcher = reader.searcher();
// The query parser can interpret human queries.
// Here, if the user does not specify which
diff --git a/examples/deleting_updating_documents.rs b/examples/deleting_updating_documents.rs
index ed59fa8c8..82fdd90b2 100644
--- a/examples/deleting_updating_documents.rs
+++ b/examples/deleting_updating_documents.rs
@@ -14,12 +14,16 @@ use tantivy::collector::TopDocs;
use tantivy::query::TermQuery;
use tantivy::schema::*;
use tantivy::Index;
+use tantivy::IndexReader;
// A simple helper function to fetch a single document
// given its id from our index.
// It will be helpful to check our work.
-fn extract_doc_given_isbn(index: &Index, isbn_term: &Term) -> tantivy::Result> {
- let searcher = index.searcher();
+fn extract_doc_given_isbn(
+ reader: &IndexReader,
+ isbn_term: &Term,
+) -> tantivy::Result > {
+ let searcher = reader.searcher();
// This is the simplest query you can think of.
// It matches all of the documents containing a specific term.
@@ -85,12 +89,12 @@ fn main() -> tantivy::Result<()> {
isbn => "978-9176370711",
));
index_writer.commit()?;
- index.load_searchers()?;
+ let reader = index.reader()?;
let frankenstein_isbn = Term::from_field_text(isbn, "978-9176370711");
// Oops our frankenstein doc seems mispelled
- let frankenstein_doc_misspelled = extract_doc_given_isbn(&index, &frankenstein_isbn)?.unwrap();
+ let frankenstein_doc_misspelled = extract_doc_given_isbn(&reader, &frankenstein_isbn)?.unwrap();
assert_eq!(
schema.to_json(&frankenstein_doc_misspelled),
r#"{"isbn":["978-9176370711"],"title":["Frankentein"]}"#,
@@ -129,10 +133,10 @@ fn main() -> tantivy::Result<()> {
// Everything happened as if the document was updated.
index_writer.commit()?;
// We reload our searcher to make our change available to clients.
- index.load_searchers()?;
+ reader.reload()?;
// No more typo!
- let frankenstein_new_doc = extract_doc_given_isbn(&index, &frankenstein_isbn)?.unwrap();
+ let frankenstein_new_doc = extract_doc_given_isbn(&reader, &frankenstein_isbn)?.unwrap();
assert_eq!(
schema.to_json(&frankenstein_new_doc),
r#"{"isbn":["978-9176370711"],"title":["Frankenstein"]}"#,
diff --git a/examples/faceted_search.rs b/examples/faceted_search.rs
index 9d68f2a4e..0a99c7131 100644
--- a/examples/faceted_search.rs
+++ b/examples/faceted_search.rs
@@ -55,9 +55,9 @@ fn main() -> tantivy::Result<()> {
index_writer.commit()?;
- index.load_searchers()?;
+ let reader = index.reader()?;
- let searcher = index.searcher();
+ let searcher = reader.searcher();
let mut facet_collector = FacetCollector::for_field(tags);
facet_collector.add_facet("/pools");
diff --git a/examples/integer_range_search.rs b/examples/integer_range_search.rs
index 2aac51873..dea3145b6 100644
--- a/examples/integer_range_search.rs
+++ b/examples/integer_range_search.rs
@@ -19,6 +19,7 @@ fn run() -> Result<()> {
let year_field = schema_builder.add_u64_field("year", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
+ let reader = index.reader()?;
{
let mut index_writer = index.writer_with_num_threads(1, 6_000_000)?;
for year in 1950u64..2019u64 {
@@ -27,8 +28,8 @@ fn run() -> Result<()> {
index_writer.commit()?;
// The index will be a range of years
}
- index.load_searchers()?;
- let searcher = index.searcher();
+ reader.reload()?;
+ let searcher = reader.searcher();
// The end is excluded i.e. here we are searching up to 1969
let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970);
// Uses a Count collector to sum the total number of docs in the range
diff --git a/examples/iterating_docs_and_positions.rs b/examples/iterating_docs_and_positions.rs
index 62513ea7a..4668de3c0 100644
--- a/examples/iterating_docs_and_positions.rs
+++ b/examples/iterating_docs_and_positions.rs
@@ -33,9 +33,9 @@ fn main() -> tantivy::Result<()> {
index_writer.add_document(doc!(title => "The modern Promotheus"));
index_writer.commit()?;
- index.load_searchers()?;
+ let reader = index.reader()?;
- let searcher = index.searcher();
+ let searcher = reader.searcher();
// A tantivy index is actually a collection of segments.
// Similarly, a searcher just wraps a list `segment_reader`.
diff --git a/examples/snippet.rs b/examples/snippet.rs
index 35ba07557..b79ede83b 100644
--- a/examples/snippet.rs
+++ b/examples/snippet.rs
@@ -48,9 +48,8 @@ fn main() -> tantivy::Result<()> {
// ...
index_writer.commit()?;
- index.load_searchers()?;
-
- let searcher = index.searcher();
+ let reader = index.reader()?;
+ let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, body]);
let query = query_parser.parse_query("sycamore spring")?;
diff --git a/examples/stop_words.rs b/examples/stop_words.rs
index cdfe054e8..a6b338060 100644
--- a/examples/stop_words.rs
+++ b/examples/stop_words.rs
@@ -96,9 +96,9 @@ fn main() -> tantivy::Result<()> {
index_writer.commit()?;
- index.load_searchers()?;
+ let reader = index.reader()?;
- let searcher = index.searcher();
+ let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, body]);
diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs
index ea2a1d9cd..85ceaa3ab 100644
--- a/src/collector/count_collector.rs
+++ b/src/collector/count_collector.rs
@@ -40,8 +40,8 @@ use SegmentReader;
/// index_writer.commit().unwrap();
/// }
///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// {
/// let query_parser = QueryParser::for_index(&index, vec![title]);
diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs
index 083bd65ae..16ce9428b 100644
--- a/src/collector/facet_collector.rs
+++ b/src/collector/facet_collector.rs
@@ -122,17 +122,16 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// facet => Facet::from("/lang/en"),
/// facet => Facet::from("/category/biography")
/// ));
-/// index_writer.commit().unwrap();
+/// index_writer.commit()?;
/// }
-///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// {
/// let mut facet_collector = FacetCollector::for_field(facet);
/// facet_collector.add_facet("/lang");
/// facet_collector.add_facet("/category");
-/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap();
+/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
///
/// // This lists all of the facet counts
/// let facets: Vec<(&Facet, u64)> = facet_counts
@@ -147,7 +146,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// {
/// let mut facet_collector = FacetCollector::for_field(facet);
/// facet_collector.add_facet("/category/fiction");
-/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap();
+/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
///
/// // This lists all of the facet counts
/// let facets: Vec<(&Facet, u64)> = facet_counts
@@ -163,7 +162,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// {
/// let mut facet_collector = FacetCollector::for_field(facet);
/// facet_collector.add_facet("/category/fiction");
-/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap();
+/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
///
/// // This lists all of the facet counts
/// let facets: Vec<(&Facet, u64)> = facet_counts.top_k("/category/fiction", 1);
@@ -483,8 +482,8 @@ mod tests {
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top1"));
let counts = searcher.search(&AllQuery, &facet_collector).unwrap();
@@ -532,8 +531,8 @@ mod tests {
facet_field => Facet::from_text(&"/subjects/B/b"),
));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 1);
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet("/subjects");
@@ -579,9 +578,7 @@ mod tests {
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
-
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet("/facet");
@@ -635,8 +632,7 @@ mod bench {
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
-
+ let reader = index.reader().unwrap();
b.iter(|| {
let searcher = index.searcher();
let facet_collector = FacetCollector::for_field(facet_field);
diff --git a/src/collector/int_facet_collector.rs b/src/collector/int_facet_collector.rs
index 01f00cc37..4232343e6 100644
--- a/src/collector/int_facet_collector.rs
+++ b/src/collector/int_facet_collector.rs
@@ -101,8 +101,7 @@ mod tests {
assert_eq!(index_writer.commit().unwrap(), 10u64);
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().searcher();
let mut ffvf_i64: IntFacetCollector = IntFacetCollector::new(num_field_i64);
let mut ffvf_u64: IntFacetCollector = IntFacetCollector::new(num_field_u64);
diff --git a/src/collector/mod.rs b/src/collector/mod.rs
index 046b26ed0..515799fdf 100644
--- a/src/collector/mod.rs
+++ b/src/collector/mod.rs
@@ -53,9 +53,9 @@ use tantivy::collector::{Count, TopDocs};
# index_writer.add_document(doc!(
# title => "The Diary of Muadib",
# ));
-# index_writer.commit().unwrap();
-# index.load_searchers()?;
-# let searcher = index.searcher();
+# index_writer.commit()?;
+# let reader = index.reader()?;
+# let searcher = reader.searcher();
# let query_parser = QueryParser::for_index(&index, vec![title]);
# let query = query_parser.parse_query("diary")?;
let (doc_count, top_docs): (usize, Vec<(Score, DocAddress)>) =
diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs
index f12c648a4..cc3bfc488 100644
--- a/src/collector/multi_collector.rs
+++ b/src/collector/multi_collector.rs
@@ -134,8 +134,8 @@ impl FruitHandle {
/// index_writer.commit().unwrap();
/// }
///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// let mut collectors = MultiCollector::new();
/// let top_docs_handle = collectors.add_collector(TopDocs::with_limit(2));
@@ -278,8 +278,7 @@ mod tests {
index_writer.add_document(doc!(text=>"abc"));
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let term = Term::from_field_text(text, "abc");
let query = TermQuery::new(term, IndexRecordOption::Basic);
diff --git a/src/collector/top_field_collector.rs b/src/collector/top_field_collector.rs
index 02551310c..b1a2d5ec8 100644
--- a/src/collector/top_field_collector.rs
+++ b/src/collector/top_field_collector.rs
@@ -23,15 +23,16 @@ use SegmentReader;
/// # use tantivy::schema::{Schema, Field, FAST, TEXT};
/// # use tantivy::{Index, Result, DocAddress};
/// # use tantivy::query::{Query, QueryParser};
+/// use tantivy::Searcher;
/// use tantivy::collector::TopDocs;
///
-/// # fn main() {
+/// # fn main() -> tantivy::Result<()> {
/// # let mut schema_builder = Schema::builder();
/// # let title = schema_builder.add_text_field("title", TEXT);
/// # let rating = schema_builder.add_u64_field("rating", FAST);
/// # let schema = schema_builder.build();
/// # let index = Index::create_in_ram(schema);
-/// # let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
+/// # let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?;
/// # index_writer.add_document(doc!(
/// # title => "The Name of the Wind",
/// # rating => 92u64,
@@ -39,13 +40,14 @@ use SegmentReader;
/// # index_writer.add_document(doc!(title => "The Diary of Muadib", rating => 97u64));
/// # index_writer.add_document(doc!(title => "A Dairy Cow", rating => 63u64));
/// # index_writer.add_document(doc!(title => "The Diary of a Young Girl", rating => 80u64));
-/// # index_writer.commit().unwrap();
-/// # index.load_searchers().unwrap();
-/// # let query = QueryParser::for_index(&index, vec![title]).parse_query("diary").unwrap();
-/// # let top_docs = docs_sorted_by_rating(&index, &query, rating).unwrap();
+/// # index_writer.commit()?;
+/// # let reader = index.reader()?;
+/// # let query = QueryParser::for_index(&index, vec![title]).parse_query("diary")?;
+/// # let top_docs = docs_sorted_by_rating(&reader.searcher(), &query, rating)?;
/// # assert_eq!(top_docs,
/// # vec![(97u64, DocAddress(0u32, 1)),
/// # (80u64, DocAddress(0u32, 3))]);
+/// # Ok(())
/// # }
/// #
/// /// Searches the document matching the given query, and
@@ -53,7 +55,9 @@ use SegmentReader;
/// /// given in argument.
/// ///
/// /// `field` is required to be a FAST field.
-/// fn docs_sorted_by_rating(index: &Index, query: &Query, sort_by_field: Field)
+/// fn docs_sorted_by_rating(searcher: &Searcher,
+/// query: &Query,
+/// sort_by_field: Field)
/// -> Result> {
///
/// // This is where we build our collector!
@@ -61,8 +65,7 @@ use SegmentReader;
///
/// // ... and here is our documents. Not this is a simple vec.
/// // The `u64` in the pair is the value of our fast field for each documents.
-/// index.searcher()
-/// .search(query, &top_docs_by_rating)
+/// searcher.search(query, &top_docs_by_rating)
/// }
/// ```
pub struct TopDocsByField {
@@ -177,7 +180,7 @@ mod tests {
size => 16u64,
));
});
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let top_collector = TopDocs::with_limit(4).order_by_field(size);
let top_docs: Vec<(u64, DocAddress)> = searcher.search(&query, &top_collector).unwrap();
@@ -204,7 +207,7 @@ mod tests {
size => 12u64,
));
});
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let top_collector: TopDocsByField = TopDocs::with_limit(4).order_by_field(Field(2));
let segment_reader = searcher.segment_reader(0u32);
top_collector
@@ -224,7 +227,7 @@ mod tests {
size => 12u64,
));
});
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment = searcher.segment_reader(0);
let top_collector: TopDocsByField = TopDocs::with_limit(4).order_by_field(size);
assert_matches!(
@@ -247,8 +250,6 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
doc_adder(&mut index_writer);
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
-
let query_parser = QueryParser::for_index(&index, vec![query_field]);
let query = query_parser.parse_query(query).unwrap();
(index, query)
diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs
index 869022686..43b7424d6 100644
--- a/src/collector/top_score_collector.rs
+++ b/src/collector/top_score_collector.rs
@@ -51,8 +51,8 @@ use SegmentReader;
/// index_writer.commit().unwrap();
/// }
///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary")?;
@@ -148,7 +148,6 @@ mod tests {
index_writer.add_document(doc!(text_field=>"I like Droopy"));
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
index
}
@@ -159,6 +158,8 @@ mod tests {
let query_parser = QueryParser::for_index(&index, vec![field]);
let text_query = query_parser.parse_query("droopy tax").unwrap();
let score_docs: Vec<(Score, DocAddress)> = index
+ .reader()
+ .unwrap()
.searcher()
.search(&text_query, &TopDocs::with_limit(4))
.unwrap();
@@ -179,6 +180,8 @@ mod tests {
let query_parser = QueryParser::for_index(&index, vec![field]);
let text_query = query_parser.parse_query("droopy tax").unwrap();
let score_docs: Vec<(Score, DocAddress)> = index
+ .reader()
+ .unwrap()
.searcher()
.search(&text_query, &TopDocs::with_limit(2))
.unwrap();
diff --git a/src/core/index.rs b/src/core/index.rs
index 8638d7fa2..62a31cc95 100644
--- a/src/core/index.rs
+++ b/src/core/index.rs
@@ -1,19 +1,14 @@
-use super::pool::LeasedItem;
-use super::pool::Pool;
use super::segment::create_segment;
use super::segment::Segment;
-use core::searcher::Searcher;
use core::Executor;
use core::IndexMeta;
use core::SegmentId;
use core::SegmentMeta;
-use core::SegmentReader;
use core::META_FILEPATH;
use directory::ManagedDirectory;
#[cfg(feature = "mmap")]
use directory::MmapDirectory;
use directory::INDEX_WRITER_LOCK;
-use directory::META_LOCK;
use directory::{Directory, RAMDirectory};
use error::DataCorruption;
use error::TantivyError;
@@ -21,6 +16,8 @@ use indexer::index_writer::open_index_writer;
use indexer::index_writer::HEAP_SIZE_MIN;
use indexer::segment_updater::save_new_metas;
use num_cpus;
+use reader::IndexReader;
+use reader::IndexReaderBuilder;
use schema::Field;
use schema::FieldType;
use schema::Schema;
@@ -28,7 +25,6 @@ use serde_json;
use std::borrow::BorrowMut;
use std::fmt;
use std::path::Path;
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokenizer::BoxedTokenizer;
use tokenizer::TokenizerManager;
@@ -53,8 +49,6 @@ fn load_metas(directory: &Directory) -> Result {
pub struct Index {
directory: ManagedDirectory,
schema: Schema,
- num_searchers: Arc,
- searcher_pool: Arc>,
executor: Arc,
tokenizers: TokenizerManager,
}
@@ -159,16 +153,12 @@ impl Index {
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: ManagedDirectory, metas: &IndexMeta) -> Result {
let schema = metas.schema.clone();
- let n_cpus = num_cpus::get();
let index = Index {
directory,
schema,
- num_searchers: Arc::new(AtomicUsize::new(n_cpus)),
- searcher_pool: Arc::new(Pool::new()),
tokenizers: TokenizerManager::default(),
executor: Arc::new(Executor::single_thread()),
};
- index.load_searchers()?;
Ok(index)
}
@@ -198,6 +188,22 @@ impl Index {
}
}
+ /// Create a default `IndexReader` for the given index.
+ ///
+ /// See [`Index.reader_builder()`](#method.reader_builder).
+ pub fn reader(&self) -> Result {
+ self.reader_builder().try_into()
+ }
+
+ /// Create a `IndexReader` for the given index.
+ ///
+ /// Most project should create at most one reader for a given index.
+ /// This method is typically called only once per `Index` instance,
+ /// over the lifetime of most problem.
+ pub fn reader_builder(&self) -> IndexReaderBuilder {
+ IndexReaderBuilder::new(self.clone())
+ }
+
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir>(directory_path: P) -> Result {
@@ -336,53 +342,6 @@ impl Index {
.map(|segment_meta| segment_meta.id())
.collect())
}
-
- /// Sets the number of searchers to use
- ///
- /// Only works after the next call to `load_searchers`
- pub fn set_num_searchers(&mut self, num_searchers: usize) {
- self.num_searchers.store(num_searchers, Ordering::Release);
- }
-
- /// Update searchers so that they reflect the state of the last
- /// `.commit()`.
- ///
- /// If indexing happens in the same process as searching,
- /// you most likely want to call `.load_searchers()` right after each
- /// successful call to `.commit()`.
- ///
- /// If indexing and searching happen in different processes, the way to
- /// get the freshest `index` at all time, is to watch `meta.json` and
- /// call `load_searchers` whenever a changes happen.
- pub fn load_searchers(&self) -> Result<()> {
- let _meta_lock = self.directory().acquire_lock(&META_LOCK)?;
- let searchable_segments = self.searchable_segments()?;
- let segment_readers: Vec = searchable_segments
- .iter()
- .map(SegmentReader::open)
- .collect::>()?;
- let schema = self.schema();
- let num_searchers: usize = self.num_searchers.load(Ordering::Acquire);
- let searchers = (0..num_searchers)
- .map(|_| Searcher::new(schema.clone(), self.clone(), segment_readers.clone()))
- .collect();
- self.searcher_pool.publish_new_generation(searchers);
- Ok(())
- }
-
- /// Returns a searcher
- ///
- /// This method should be called every single time a search
- /// query is performed.
- /// The searchers are taken from a pool of `num_searchers` searchers.
- /// If no searcher is available
- /// this may block.
- ///
- /// The same searcher must be used for a given query, as it ensures
- /// the use of a consistent segment set.
- pub fn searcher(&self) -> LeasedItem {
- self.searcher_pool.acquire()
- }
}
impl fmt::Debug for Index {
@@ -394,8 +353,16 @@ impl fmt::Debug for Index {
#[cfg(test)]
mod tests {
use directory::RAMDirectory;
+ use schema::Field;
use schema::{Schema, INDEXED, TEXT};
+ use std::path::PathBuf;
+ use std::thread;
+ use std::time::Duration;
+ use tempdir::TempDir;
use Index;
+ use IndexReader;
+ use IndexWriter;
+ use ReloadPolicy;
#[test]
fn test_indexer_for_field() {
@@ -461,4 +428,106 @@ mod tests {
let _ = schema_builder.add_u64_field("num_likes", INDEXED);
schema_builder.build()
}
+
+ #[test]
+ fn test_index_on_commit_reload_policy() {
+ let schema = throw_away_schema();
+ let field = schema.get_field("num_likes").unwrap();
+ let index = Index::create_in_ram(schema);
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::OnCommit)
+ .try_into()
+ .unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
+ let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
+ test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
+ }
+
+ #[test]
+ fn test_index_on_commit_reload_policy_mmap() {
+ let schema = throw_away_schema();
+ let field = schema.get_field("num_likes").unwrap();
+ let tempdir = TempDir::new("index").unwrap();
+ let tempdir_path = PathBuf::from(tempdir.path());
+ let index = Index::create_in_dir(&tempdir_path, schema).unwrap();
+ let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
+ writer.commit().unwrap();
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::OnCommit)
+ .try_into()
+ .unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
+ test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
+ }
+
+ #[test]
+ fn test_index_manual_policy_mmap() {
+ let schema = throw_away_schema();
+ let field = schema.get_field("num_likes").unwrap();
+ let index = Index::create_from_tempdir(schema).unwrap();
+ let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
+ writer.commit().unwrap();
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
+ writer.add_document(doc!(field=>1u64));
+ writer.commit().unwrap();
+ thread::sleep(Duration::from_millis(500));
+ assert_eq!(reader.searcher().num_docs(), 0);
+ reader.reload().unwrap();
+ assert_eq!(reader.searcher().num_docs(), 1);
+ }
+
+ #[test]
+ fn test_index_on_commit_reload_policy_different_directories() {
+ let schema = throw_away_schema();
+ let field = schema.get_field("num_likes").unwrap();
+ let tempdir = TempDir::new("index").unwrap();
+ let tempdir_path = PathBuf::from(tempdir.path());
+ let write_index = Index::create_in_dir(&tempdir_path, schema).unwrap();
+ let read_index = Index::open_in_dir(&tempdir_path).unwrap();
+ let reader = read_index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::OnCommit)
+ .try_into()
+ .unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
+ let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap();
+ test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
+ }
+
+ fn test_index_on_commit_reload_policy_aux(
+ field: Field,
+ writer: &mut IndexWriter,
+ reader: &IndexReader,
+ ) {
+ assert_eq!(reader.searcher().num_docs(), 0);
+ writer.add_document(doc!(field=>1u64));
+ writer.commit().unwrap();
+ let mut count = 0;
+ for _ in 0..100 {
+ count = reader.searcher().num_docs();
+ if count > 0 {
+ break;
+ }
+ thread::sleep(Duration::from_millis(100));
+ }
+ assert_eq!(count, 1);
+ writer.add_document(doc!(field=>2u64));
+ writer.commit().unwrap();
+ let mut count = 0;
+ for _ in 0..10 {
+ count = reader.searcher().num_docs();
+ if count > 1 {
+ break;
+ }
+ thread::sleep(Duration::from_millis(100));
+ }
+ assert_eq!(count, 2);
+ }
}
diff --git a/src/core/mod.rs b/src/core/mod.rs
index 1aae75f8b..9e5717afa 100644
--- a/src/core/mod.rs
+++ b/src/core/mod.rs
@@ -2,7 +2,6 @@ mod executor;
pub mod index;
mod index_meta;
mod inverted_index_reader;
-mod pool;
pub mod searcher;
mod segment;
mod segment_component;
@@ -25,6 +24,7 @@ pub use self::segment_reader::SegmentReader;
use std::path::PathBuf;
lazy_static! {
+
/// The meta file contains all the information about the list of segments and the schema
/// of the index.
pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");
diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs
index 597ecae39..a8ac2b0fb 100644
--- a/src/core/segment_reader.rs
+++ b/src/core/segment_reader.rs
@@ -477,9 +477,7 @@ mod test {
// ok, now we should have a deleted doc
index_writer2.commit().unwrap();
}
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let docs: Vec = searcher.segment_reader(0).doc_ids_alive().collect();
assert_eq!(vec![0u32, 2u32], docs);
}
diff --git a/src/directory/directory.rs b/src/directory/directory.rs
index c5f975c9d..943998e2d 100644
--- a/src/directory/directory.rs
+++ b/src/directory/directory.rs
@@ -1,6 +1,8 @@
use directory::directory_lock::Lock;
use directory::error::LockError;
use directory::error::{DeleteError, OpenReadError, OpenWriteError};
+use directory::WatchCallback;
+use directory::WatchHandle;
use directory::{ReadOnlySource, WritePtr};
use std::fmt;
use std::io;
@@ -187,6 +189,22 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
}
}
}
+
+ /// Registers a callback that will be called whenever a change on the `meta.json`
+ /// using the `atomic_write` API is detected.
+ ///
+ /// The behavior when using `.watch()` on a file using `.open_write(...)` is, on the other
+ /// hand, undefined.
+ ///
+ /// The file will be watched for the lifetime of the returned `WatchHandle`. The caller is
+ /// required to keep it.
+ /// It does not override previous callbacks. When the file is modified, all callback that are
+ /// registered (and whose `WatchHandle` is still alive) are triggered.
+ ///
+ /// Internally, tantivy only uses this API to detect new commits to implement the
+ /// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
+ /// `OnCommit` `ReloadPolicy` to work properly.
+ fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
}
/// DirectoryClone
diff --git a/src/directory/directory_lock.rs b/src/directory/directory_lock.rs
index f341ea6f9..67c2585dd 100644
--- a/src/directory/directory_lock.rs
+++ b/src/directory/directory_lock.rs
@@ -43,7 +43,7 @@ lazy_static! {
is_blocking: false
};
/// The meta lock file is here to protect the segment files being opened by
- /// `.load_searchers()` from being garbage collected.
+ /// `IndexReader::reload()` from being garbage collected.
/// It makes it possible for another process to safely consume
/// our index in-writing. Ideally, we may have prefered `RWLock` semantics
/// here, but it is difficult to achieve on Windows.
diff --git a/src/directory/error.rs b/src/directory/error.rs
index 1179ad28d..e56971029 100644
--- a/src/directory/error.rs
+++ b/src/directory/error.rs
@@ -73,6 +73,14 @@ pub enum OpenDirectoryError {
DoesNotExist(PathBuf),
/// The path exists but is not a directory.
NotADirectory(PathBuf),
+ /// IoError
+ IoError(io::Error),
+}
+
+impl From for OpenDirectoryError {
+ fn from(io_err: io::Error) -> Self {
+ OpenDirectoryError::IoError(io_err)
+ }
}
impl fmt::Display for OpenDirectoryError {
@@ -84,6 +92,11 @@ impl fmt::Display for OpenDirectoryError {
OpenDirectoryError::NotADirectory(ref path) => {
write!(f, "the path '{:?}' exists but is not a directory", path)
}
+ OpenDirectoryError::IoError(ref err) => write!(
+ f,
+ "IOError while trying to open/create the directory. {:?}",
+ err
+ ),
}
}
}
diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs
index 5367494f0..ddd8dfbba 100644
--- a/src/directory/managed_directory.rs
+++ b/src/directory/managed_directory.rs
@@ -4,6 +4,7 @@ use directory::DirectoryLock;
use directory::Lock;
use directory::META_LOCK;
use directory::{ReadOnlySource, WritePtr};
+use directory::{WatchCallback, WatchHandle};
use error::DataCorruption;
use serde_json;
use std::collections::HashSet;
@@ -241,6 +242,10 @@ impl Directory for ManagedDirectory {
fn acquire_lock(&self, lock: &Lock) -> result::Result {
self.directory.acquire_lock(lock)
}
+
+ fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
+ self.directory.watch(watch_callback)
+ }
}
impl Clone for ManagedDirectory {
diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs
index 8489f806f..70c277d56 100644
--- a/src/directory/mmap_directory.rs
+++ b/src/directory/mmap_directory.rs
@@ -1,8 +1,13 @@
extern crate fs2;
+extern crate notify;
use self::fs2::FileExt;
+use self::notify::RawEvent;
+use self::notify::RecursiveMode;
+use self::notify::Watcher;
use atomicwrites;
use common::make_io_err;
+use core::META_FILEPATH;
use directory::error::LockError;
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
use directory::read_only_source::BoxedData;
@@ -10,6 +15,9 @@ use directory::Directory;
use directory::DirectoryLock;
use directory::Lock;
use directory::ReadOnlySource;
+use directory::WatchCallback;
+use directory::WatchCallbackList;
+use directory::WatchHandle;
use directory::WritePtr;
use memmap::Mmap;
use std::collections::HashMap;
@@ -21,14 +29,16 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::result;
+use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
+use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
+use std::thread;
use tempdir::TempDir;
/// Returns None iff the file exists, can be read, but is empty (and hence
-/// cannot be mmapped).
-///
+/// cannot be mmapped)
fn open_mmap(full_path: &Path) -> result::Result, OpenReadError> {
let file = File::open(full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
@@ -84,7 +94,7 @@ impl Default for MmapCache {
}
impl MmapCache {
- fn get_info(&mut self) -> CacheInfo {
+ fn get_info(&self) -> CacheInfo {
let paths: Vec = self.cache.keys().cloned().collect();
CacheInfo {
counters: self.counters.clone(),
@@ -92,28 +102,105 @@ impl MmapCache {
}
}
+ fn remove_weak_ref(&mut self) {
+ let keys_to_remove: Vec = self
+ .cache
+ .iter()
+ .filter(|(_, mmap_weakref)| mmap_weakref.upgrade().is_none())
+ .map(|(key, _)| key.clone())
+ .collect();
+ for key in keys_to_remove {
+ self.cache.remove(&key);
+ }
+ }
+
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result>, OpenReadError> {
- let path_in_cache = self.cache.contains_key(full_path);
- if path_in_cache {
- {
- let mmap_weak_opt = self.cache.get(full_path);
- if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) {
- self.counters.hit += 1;
- return Ok(Some(mmap_arc));
- }
+ if let Some(mmap_weak) = self.cache.get(full_path) {
+ if let Some(mmap_arc) = mmap_weak.upgrade() {
+ self.counters.hit += 1;
+ return Ok(Some(mmap_arc));
}
- self.cache.remove(full_path);
}
+ self.cache.remove(full_path);
self.counters.miss += 1;
- if let Some(mmap) = open_mmap(full_path)? {
+ Ok(if let Some(mmap) = open_mmap(full_path)? {
let mmap_arc: Arc = Arc::new(Box::new(mmap));
- self.cache
- .insert(full_path.to_owned(), Arc::downgrade(&mmap_arc));
- Ok(Some(mmap_arc))
+ let mmap_weak = Arc::downgrade(&mmap_arc);
+ self.cache.insert(full_path.to_owned(), mmap_weak);
+ Some(mmap_arc)
} else {
- Ok(None)
- }
+ None
+ })
+ }
+}
+
+struct InnerWatcherWrapper {
+ _watcher: Mutex,
+ watcher_router: WatchCallbackList,
+}
+
+impl InnerWatcherWrapper {
+ pub fn new(path: &Path) -> Result<(Self, Receiver), notify::Error> {
+ let (tx, watcher_recv): (Sender, Receiver) = channel();
+ // We need to initialize the
+ let mut watcher = notify::raw_watcher(tx)?;
+ watcher.watch(path, RecursiveMode::Recursive)?;
+ let inner = InnerWatcherWrapper {
+ _watcher: Mutex::new(watcher),
+ watcher_router: Default::default(),
+ };
+ Ok((inner, watcher_recv))
+ }
+}
+
+#[derive(Clone)]
+pub(crate) struct WatcherWrapper {
+ inner: Arc,
+}
+
+impl WatcherWrapper {
+ pub fn new(path: &Path) -> Result {
+ let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err {
+ notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
+ _ => {
+ panic!("Unknown error while starting watching directory {:?}", path);
+ }
+ })?;
+ let watcher_wrapper = WatcherWrapper {
+ inner: Arc::new(inner),
+ };
+ let watcher_wrapper_clone = watcher_wrapper.clone();
+ thread::Builder::new()
+ .name("meta-file-watch-thread".to_string())
+ .spawn(move || {
+ loop {
+ match watcher_recv.recv().map(|evt| evt.path) {
+ Ok(Some(changed_path)) => {
+ // ... Actually subject to false positive.
+ // We might want to be more accurate than this at one point.
+ if let Some(filename) = changed_path.file_name() {
+ if filename == *META_FILEPATH {
+ watcher_wrapper_clone.inner.watcher_router.broadcast();
+ }
+ }
+ }
+ Ok(None) => {
+ // not an event we are interested in.
+ }
+ Err(_e) => {
+ // the watch send channel was dropped
+ break;
+ }
+ }
+ }
+ })
+ .expect("Failed to spawn thread to watch meta.json");
+ Ok(watcher_wrapper)
+ }
+
+ pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
+ self.inner.watcher_router.subscribe(watch_callback)
}
}
@@ -131,31 +218,62 @@ impl MmapCache {
/// On Windows the semantics are again different.
#[derive(Clone)]
pub struct MmapDirectory {
+ inner: Arc,
+}
+
+struct MmapDirectoryInner {
root_path: PathBuf,
- mmap_cache: Arc>,
- _temp_directory: Arc>,
+ mmap_cache: RwLock,
+ _temp_directory: Option,
+ watcher: RwLock,
+}
+
+impl MmapDirectoryInner {
+ fn new(
+ root_path: PathBuf,
+ temp_directory: Option,
+ ) -> Result {
+ let watch_wrapper = WatcherWrapper::new(&root_path)?;
+ let mmap_directory_inner = MmapDirectoryInner {
+ root_path,
+ mmap_cache: Default::default(),
+ _temp_directory: temp_directory,
+ watcher: RwLock::new(watch_wrapper),
+ };
+ Ok(mmap_directory_inner)
+ }
+
+ fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
+ let mut wlock = self.watcher.write().unwrap();
+ wlock.watch(watch_callback)
+ }
}
impl fmt::Debug for MmapDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "MmapDirectory({:?})", self.root_path)
+ write!(f, "MmapDirectory({:?})", self.inner.root_path)
}
}
impl MmapDirectory {
+ fn new(
+ root_path: PathBuf,
+ temp_directory: Option,
+ ) -> Result {
+ let inner = MmapDirectoryInner::new(root_path, temp_directory)?;
+ Ok(MmapDirectory {
+ inner: Arc::new(inner),
+ })
+ }
+
/// Creates a new MmapDirectory in a temporary directory.
///
/// This is mostly useful to test the MmapDirectory itself.
/// For your unit tests, prefer the RAMDirectory.
- pub fn create_from_tempdir() -> io::Result {
- let tempdir = TempDir::new("index")?;
+ pub fn create_from_tempdir() -> Result {
+ let tempdir = TempDir::new("index").map_err(OpenDirectoryError::IoError)?;
let tempdir_path = PathBuf::from(tempdir.path());
- let directory = MmapDirectory {
- root_path: tempdir_path,
- mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
- _temp_directory: Arc::new(Some(tempdir)),
- };
- Ok(directory)
+ MmapDirectory::new(tempdir_path, Some(tempdir))
}
/// Opens a MmapDirectory in a directory.
@@ -173,18 +291,14 @@ impl MmapDirectory {
directory_path,
)))
} else {
- Ok(MmapDirectory {
- root_path: PathBuf::from(directory_path),
- mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
- _temp_directory: Arc::new(None),
- })
+ Ok(MmapDirectory::new(PathBuf::from(directory_path), None)?)
}
}
/// Joins a relative_path to the directory `root_path`
/// to create a proper complete `filepath`.
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
- self.root_path.join(relative_path)
+ self.inner.root_path.join(relative_path)
}
/// Sync the root directory.
@@ -209,7 +323,7 @@ impl MmapDirectory {
.custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS);
}
- let fd = open_opts.open(&self.root_path)?;
+ let fd = open_opts.open(&self.inner.root_path)?;
fd.sync_all()?;
Ok(())
}
@@ -219,9 +333,15 @@ impl MmapDirectory {
///
/// The `MmapDirectory` embeds a `MmapDirectory`
/// to avoid multiplying the `mmap` system calls.
- pub fn get_cache_info(&mut self) -> CacheInfo {
- self.mmap_cache
+ pub fn get_cache_info(&self) -> CacheInfo {
+ self.inner
+ .mmap_cache
.write()
+ .expect("mmap cache lock is poisoned")
+ .remove_weak_ref();
+ self.inner
+ .mmap_cache
+ .read()
.expect("Mmap cache lock is poisoned.")
.get_info()
}
@@ -274,7 +394,7 @@ impl Directory for MmapDirectory {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
- let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
+ let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
@@ -288,6 +408,30 @@ impl Directory for MmapDirectory {
.unwrap_or_else(ReadOnlySource::empty))
}
+ /// Any entry associated to the path in the mmap will be
+ /// removed before the file is deleted.
+ fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
+ debug!("Deleting file {:?}", path);
+ let full_path = self.resolve_path(path);
+ match fs::remove_file(&full_path) {
+ Ok(_) => self
+ .sync_directory()
+ .map_err(|e| IOError::with_path(path.to_owned(), e).into()),
+ Err(e) => {
+ if e.kind() == io::ErrorKind::NotFound {
+ Err(DeleteError::FileDoesNotExist(path.to_owned()))
+ } else {
+ Err(IOError::with_path(path.to_owned(), e).into())
+ }
+ }
+ }
+ }
+
+ fn exists(&self, path: &Path) -> bool {
+ let full_path = self.resolve_path(path);
+ full_path.exists()
+ }
+
fn open_write(&mut self, path: &Path) -> Result {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
@@ -318,30 +462,6 @@ impl Directory for MmapDirectory {
Ok(BufWriter::new(Box::new(writer)))
}
- /// Any entry associated to the path in the mmap will be
- /// removed before the file is deleted.
- fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
- debug!("Deleting file {:?}", path);
- let full_path = self.resolve_path(path);
- match fs::remove_file(&full_path) {
- Ok(_) => self
- .sync_directory()
- .map_err(|e| IOError::with_path(path.to_owned(), e).into()),
- Err(e) => {
- if e.kind() == io::ErrorKind::NotFound {
- Err(DeleteError::FileDoesNotExist(path.to_owned()))
- } else {
- Err(IOError::with_path(path.to_owned(), e).into())
- }
- }
- }
- }
-
- fn exists(&self, path: &Path) -> bool {
- let full_path = self.resolve_path(path);
- full_path.exists()
- }
-
fn atomic_read(&self, path: &Path) -> Result, OpenReadError> {
let full_path = self.resolve_path(path);
let mut buffer = Vec::new();
@@ -388,6 +508,10 @@ impl Directory for MmapDirectory {
_file: file,
})))
}
+
+ fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
+ self.inner.watch(watch_callback)
+ }
}
#[cfg(test)]
@@ -397,6 +521,13 @@ mod tests {
// The following tests are specific to the MmapDirectory
use super::*;
+ use schema::{Schema, SchemaBuilder, TEXT};
+ use std::fs;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::thread;
+ use std::time::Duration;
+ use Index;
+ use ReloadPolicy;
#[test]
fn test_open_non_existant_path() {
@@ -421,7 +552,7 @@ mod tests {
#[test]
fn test_cache() {
- let content = "abc".as_bytes();
+ let content = b"abc";
// here we test if the cache releases
// mmaps correctly.
@@ -456,26 +587,27 @@ mod tests {
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
- assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
+ assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
}
+
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
drop(keep);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
- assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
+ assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
- assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
+ assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
for path in &paths {
mmap_directory.delete(path).unwrap();
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
- assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
+ assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
for path in paths.iter() {
assert!(mmap_directory.open_read(path).is_err());
}
@@ -484,4 +616,56 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
+ #[test]
+ fn test_watch_wrapper() {
+ let counter: Arc = Default::default();
+ let counter_clone = counter.clone();
+ let tmp_dir: TempDir = tempdir::TempDir::new("test_watch_wrapper").unwrap();
+ let tmp_dirpath = tmp_dir.path().to_owned();
+ let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
+ let tmp_file = tmp_dirpath.join("coucou");
+ let _handle = watch_wrapper.watch(Box::new(move || {
+ counter_clone.fetch_add(1, Ordering::SeqCst);
+ }));
+ assert_eq!(counter.load(Ordering::SeqCst), 0);
+ fs::write(&tmp_file, b"whateverwilldo").unwrap();
+ thread::sleep(Duration::new(0, 1_000u32));
+ }
+
+ #[test]
+ fn test_mmap_released() {
+ let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
+ let mut schema_builder: SchemaBuilder = Schema::builder();
+ let text_field = schema_builder.add_text_field("text", TEXT);
+ let schema = schema_builder.build();
+ {
+ let index = Index::create(mmap_directory.clone(), schema).unwrap();
+ let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
+ for _num_commits in 0..16 {
+ for _ in 0..10 {
+ index_writer.add_document(doc!(text_field=>"abc"));
+ }
+ index_writer.commit().unwrap();
+ }
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
+ for _ in 0..30 {
+ index_writer.add_document(doc!(text_field=>"abc"));
+ index_writer.commit().unwrap();
+ reader.reload().unwrap();
+ }
+ index_writer.wait_merging_threads().unwrap();
+ reader.reload().unwrap();
+ let num_segments = reader.searcher().segment_readers().len();
+ assert_eq!(num_segments, 4);
+ assert_eq!(
+ num_segments * 7,
+ mmap_directory.get_cache_info().mmapped.len()
+ );
+ }
+ assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
+ }
}
diff --git a/src/directory/mod.rs b/src/directory/mod.rs
index 6f7ef88b8..8d880b0f9 100644
--- a/src/directory/mod.rs
+++ b/src/directory/mod.rs
@@ -12,6 +12,7 @@ mod directory_lock;
mod managed_directory;
mod ram_directory;
mod read_only_source;
+mod watch_event_router;
/// Errors specific to the directory module.
pub mod error;
@@ -21,6 +22,8 @@ pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;
+pub(crate) use self::watch_event_router::WatchCallbackList;
+pub use self::watch_event_router::{WatchCallback, WatchHandle};
use std::io::{BufWriter, Seek, Write};
#[cfg(feature = "mmap")]
diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs
index 9423affff..985117740 100644
--- a/src/directory/ram_directory.rs
+++ b/src/directory/ram_directory.rs
@@ -1,7 +1,8 @@
-use common::make_io_err;
-use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
+use core::META_FILEPATH;
+use directory::error::{DeleteError, OpenReadError, OpenWriteError};
+use directory::WatchCallbackList;
use directory::WritePtr;
-use directory::{Directory, ReadOnlySource};
+use directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
use std::collections::HashMap;
use std::fmt;
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
@@ -21,13 +22,13 @@ use std::sync::{Arc, RwLock};
///
struct VecWriter {
path: PathBuf,
- shared_directory: InnerDirectory,
+ shared_directory: RAMDirectory,
data: Cursor>,
is_flushed: bool,
}
impl VecWriter {
- fn new(path_buf: PathBuf, shared_directory: InnerDirectory) -> VecWriter {
+ fn new(path_buf: PathBuf, shared_directory: RAMDirectory) -> VecWriter {
VecWriter {
path: path_buf,
data: Cursor::new(Vec::new()),
@@ -63,74 +64,44 @@ impl Write for VecWriter {
fn flush(&mut self) -> io::Result<()> {
self.is_flushed = true;
- self.shared_directory
- .write(self.path.clone(), self.data.get_ref())?;
+ let mut fs = self.shared_directory.fs.write().unwrap();
+ fs.write(self.path.clone(), self.data.get_ref());
Ok(())
}
}
-#[derive(Clone)]
-struct InnerDirectory(Arc>>);
+#[derive(Default)]
+struct InnerDirectory {
+ fs: HashMap,
+ watch_router: WatchCallbackList,
+}
impl InnerDirectory {
- fn new() -> InnerDirectory {
- InnerDirectory(Arc::new(RwLock::new(HashMap::new())))
- }
-
- fn write(&self, path: PathBuf, data: &[u8]) -> io::Result {
- let mut map = self.0.write().map_err(|_| {
- make_io_err(format!(
- "Failed to lock the directory, when trying to write {:?}",
- path
- ))
- })?;
- let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data)));
- Ok(prev_value.is_some())
+ fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
+ let data = ReadOnlySource::new(Vec::from(data));
+ self.fs.insert(path, data).is_some()
}
fn open_read(&self, path: &Path) -> Result {
- self.0
- .read()
- .map_err(|_| {
- let msg = format!(
- "Failed to acquire read lock for the \
- directory when trying to read {:?}",
- path
- );
- let io_err = make_io_err(msg);
- OpenReadError::IOError(IOError::with_path(path.to_owned(), io_err))
- })
- .and_then(|readable_map| {
- readable_map
- .get(path)
- .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
- .map(|el| el.clone())
- })
+ self.fs
+ .get(path)
+ .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
+ .map(|el| el.clone())
}
- fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
- self.0
- .write()
- .map_err(|_| {
- let msg = format!(
- "Failed to acquire write lock for the \
- directory when trying to delete {:?}",
- path
- );
- let io_err = make_io_err(msg);
- DeleteError::IOError(IOError::with_path(path.to_owned(), io_err))
- })
- .and_then(|mut writable_map| match writable_map.remove(path) {
- Some(_) => Ok(()),
- None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
- })
+ fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
+ match self.fs.remove(path) {
+ Some(_) => Ok(()),
+ None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
+ }
}
fn exists(&self, path: &Path) -> bool {
- self.0
- .read()
- .expect("Failed to get read lock directory.")
- .contains_key(path)
+ self.fs.contains_key(path)
+ }
+
+ fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
+ self.watch_router.subscribe(watch_handle)
}
}
@@ -145,33 +116,36 @@ impl fmt::Debug for RAMDirectory {
/// It is mainly meant for unit testing.
/// Writes are only made visible upon flushing.
///
-#[derive(Clone)]
+#[derive(Clone, Default)]
pub struct RAMDirectory {
- fs: InnerDirectory,
+ fs: Arc>,
}
impl RAMDirectory {
/// Constructor
pub fn create() -> RAMDirectory {
- RAMDirectory {
- fs: InnerDirectory::new(),
- }
+ Self::default()
}
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result {
- self.fs.open_read(path)
+ self.fs.read().unwrap().open_read(path)
+ }
+
+ fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
+ self.fs.write().unwrap().delete(path)
+ }
+
+ fn exists(&self, path: &Path) -> bool {
+ self.fs.read().unwrap().exists(path)
}
fn open_write(&mut self, path: &Path) -> Result {
+ let mut fs = self.fs.write().unwrap();
let path_buf = PathBuf::from(path);
- let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
-
- let exists = self
- .fs
- .write(path_buf.clone(), &Vec::new())
- .map_err(|err| IOError::with_path(path.to_owned(), err))?;
+ let vec_writer = VecWriter::new(path_buf.clone(), self.clone());
+ let exists = fs.write(path_buf.clone(), &[]);
// force the creation of the file to mimic the MMap directory.
if exists {
Err(OpenWriteError::FileAlreadyExists(path_buf))
@@ -180,17 +154,8 @@ impl Directory for RAMDirectory {
}
}
- fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
- self.fs.delete(path)
- }
-
- fn exists(&self, path: &Path) -> bool {
- self.fs.exists(path)
- }
-
fn atomic_read(&self, path: &Path) -> Result, OpenReadError> {
- let read = self.open_read(path)?;
- Ok(read.as_slice().to_owned())
+ Ok(self.open_read(path)?.as_slice().to_owned())
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
@@ -199,10 +164,20 @@ impl Directory for RAMDirectory {
msg.unwrap_or("Undefined".to_string())
)));
let path_buf = PathBuf::from(path);
- let mut vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
- self.fs.write(path_buf, &Vec::new())?;
+
+ // Reserve the path to prevent calls to .write() to succeed.
+ self.fs.write().unwrap().write(path_buf.clone(), &[]);
+
+ let mut vec_writer = VecWriter::new(path_buf.clone(), self.clone());
vec_writer.write_all(data)?;
vec_writer.flush()?;
+ if path == Path::new(&*META_FILEPATH) {
+ self.fs.write().unwrap().watch_router.broadcast();
+ }
Ok(())
}
+
+ fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
+ self.fs.write().unwrap().watch(watch_callback)
+ }
}
diff --git a/src/directory/tests.rs b/src/directory/tests.rs
index 11bedfeb5..13cb34ede 100644
--- a/src/directory/tests.rs
+++ b/src/directory/tests.rs
@@ -1,7 +1,13 @@
use super::*;
use std::io::{Seek, SeekFrom, Write};
+use std::mem;
use std::path::{Path, PathBuf};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::thread;
use std::time;
+use std::time::Duration;
lazy_static! {
static ref TEST_PATH: &'static Path = Path::new("some_path_for_test");
@@ -30,19 +36,18 @@ fn ram_directory_panics_if_flush_forgotten() {
fn test_simple(directory: &mut Directory) {
{
- {
- let mut write_file = directory.open_write(*TEST_PATH).unwrap();
- assert!(directory.exists(*TEST_PATH));
- write_file.write_all(&[4]).unwrap();
- write_file.write_all(&[3]).unwrap();
- write_file.write_all(&[7, 3, 5]).unwrap();
- write_file.flush().unwrap();
- }
+ let mut write_file = directory.open_write(*TEST_PATH).unwrap();
+ assert!(directory.exists(*TEST_PATH));
+ write_file.write_all(&[4]).unwrap();
+ write_file.write_all(&[3]).unwrap();
+ write_file.write_all(&[7, 3, 5]).unwrap();
+ write_file.flush().unwrap();
+ }
+ {
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
}
-
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(!directory.exists(*TEST_PATH));
}
@@ -121,6 +126,41 @@ fn test_directory(directory: &mut Directory) {
test_directory_delete(directory);
test_lock_non_blocking(directory);
test_lock_blocking(directory);
+ test_watch(directory);
+}
+
+fn test_watch(directory: &mut Directory) {
+ let counter: Arc = Default::default();
+ let counter_clone = counter.clone();
+ let watch_callback = Box::new(move || {
+ counter_clone.fetch_add(1, Ordering::SeqCst);
+ });
+ assert!(directory
+ .atomic_write(Path::new("meta.json"), b"random_test_data")
+ .is_ok());
+ thread::sleep(Duration::new(0, 10_000));
+ assert_eq!(0, counter.load(Ordering::SeqCst));
+
+ let watch_handle = directory.watch(watch_callback);
+ for i in 0..10 {
+ assert_eq!(i, counter.load(Ordering::SeqCst));
+ assert!(directory
+ .atomic_write(Path::new("meta.json"), b"random_test_data_2")
+ .is_ok());
+ for _ in 0..100 {
+ if counter.load(Ordering::SeqCst) > i {
+ break;
+ }
+ thread::sleep(Duration::from_millis(10));
+ }
+ assert_eq!(i + 1, counter.load(Ordering::SeqCst));
+ }
+ mem::drop(watch_handle);
+ assert!(directory
+ .atomic_write(Path::new("meta.json"), b"random_test_data")
+ .is_ok());
+ thread::sleep(Duration::from_millis(200));
+ assert_eq!(10, counter.load(Ordering::SeqCst));
}
fn test_lock_non_blocking(directory: &mut Directory) {
diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs
new file mode 100644
index 000000000..820c73a11
--- /dev/null
+++ b/src/directory/watch_event_router.rs
@@ -0,0 +1,156 @@
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::sync::Weak;
+
+/// Type alias for callbacks registered when watching files of a `Directory`.
+pub type WatchCallback = Box () + Sync + Send>;
+
+/// Helper struct to implement the watch method in `Directory` implementations.
+///
+/// It registers callbacks (See `.subscribe(...)`) and
+/// calls them upon calls to `.broadcast(...)`.
+#[derive(Default)]
+pub struct WatchCallbackList {
+ router: RwLock>>,
+}
+
+/// Controls how long a directory should watch for a file change.
+///
+/// After all the clones of `WatchHandle` are dropped, the associated will not be called when a
+/// file change is detected.
+#[must_use = "This `WatchHandle` controls the lifetime of the watch and should therefore be used."]
+#[derive(Clone)]
+pub struct WatchHandle(Arc);
+
+impl WatchCallbackList {
+ /// Suscribes a new callback and returns a handle that controls the lifetime of the callback.
+ pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
+ let watch_callback_arc = Arc::new(watch_callback);
+ let watch_callback_weak = Arc::downgrade(&watch_callback_arc);
+ self.router.write().unwrap().push(watch_callback_weak);
+ WatchHandle(watch_callback_arc)
+ }
+
+ fn list_callback(&self) -> Vec> {
+ let mut callbacks = vec![];
+ let mut router_wlock = self.router.write().unwrap();
+ let mut i = 0;
+ while i < router_wlock.len() {
+ if let Some(watch) = router_wlock[i].upgrade() {
+ callbacks.push(watch);
+ i += 1;
+ } else {
+ router_wlock.swap_remove(i);
+ }
+ }
+ callbacks
+ }
+
+ /// Triggers all callbacks
+ pub fn broadcast(&self) {
+ let callbacks = self.list_callback();
+ let spawn_res = std::thread::Builder::new()
+ .name("watch-callbacks".to_string())
+ .spawn(move || {
+ for callback in callbacks {
+ callback();
+ }
+ });
+ if let Err(err) = spawn_res {
+ error!(
+ "Failed to spawn thread to call watch callbacks. Cause: {:?}",
+ err
+ );
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use directory::WatchCallbackList;
+ use std::mem;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
+ use std::thread;
+ use std::time::Duration;
+
+ const WAIT_TIME: u64 = 20;
+
+ #[test]
+ fn test_watch_event_router_simple() {
+ let watch_event_router = WatchCallbackList::default();
+ let counter: Arc = Default::default();
+ let counter_clone = counter.clone();
+ let inc_callback = Box::new(move || {
+ counter_clone.fetch_add(1, Ordering::SeqCst);
+ });
+ watch_event_router.broadcast();
+ assert_eq!(0, counter.load(Ordering::SeqCst));
+ let handle_a = watch_event_router.subscribe(inc_callback);
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(0, counter.load(Ordering::SeqCst));
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(1, counter.load(Ordering::SeqCst));
+ watch_event_router.broadcast();
+ watch_event_router.broadcast();
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(4, counter.load(Ordering::SeqCst));
+ mem::drop(handle_a);
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(4, counter.load(Ordering::SeqCst));
+ }
+
+ #[test]
+ fn test_watch_event_router_multiple_callback_same_key() {
+ let watch_event_router = WatchCallbackList::default();
+ let counter: Arc = Default::default();
+ let inc_callback = |inc: usize| {
+ let counter_clone = counter.clone();
+ Box::new(move || {
+ counter_clone.fetch_add(inc, Ordering::SeqCst);
+ })
+ };
+ let handle_a = watch_event_router.subscribe(inc_callback(1));
+ let handle_a2 = watch_event_router.subscribe(inc_callback(10));
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(0, counter.load(Ordering::SeqCst));
+ watch_event_router.broadcast();
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(22, counter.load(Ordering::SeqCst));
+ mem::drop(handle_a);
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(32, counter.load(Ordering::SeqCst));
+ mem::drop(handle_a2);
+ watch_event_router.broadcast();
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(32, counter.load(Ordering::SeqCst));
+ }
+
+ #[test]
+ fn test_watch_event_router_multiple_callback_different_key() {
+ let watch_event_router = WatchCallbackList::default();
+ let counter: Arc = Default::default();
+ let counter_clone = counter.clone();
+ let inc_callback = Box::new(move || {
+ counter_clone.fetch_add(1, Ordering::SeqCst);
+ });
+ let handle_a = watch_event_router.subscribe(inc_callback);
+ assert_eq!(0, counter.load(Ordering::SeqCst));
+ watch_event_router.broadcast();
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(2, counter.load(Ordering::SeqCst));
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ mem::drop(handle_a);
+ watch_event_router.broadcast();
+ thread::sleep(Duration::from_millis(WAIT_TIME));
+ assert_eq!(2, counter.load(Ordering::SeqCst));
+ }
+
+}
diff --git a/src/error.rs b/src/error.rs
index 7c8bb25e7..5da069105 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -162,6 +162,7 @@ impl From for TantivyError {
OpenDirectoryError::NotADirectory(directory_path) => {
TantivyError::InvalidArgument(format!("{:?} is not a directory", directory_path))
}
+ OpenDirectoryError::IoError(err) => TantivyError::IOError(IOError::from(err)),
}
}
}
diff --git a/src/fastfield/bytes/mod.rs b/src/fastfield/bytes/mod.rs
index 1a551ecc0..b3e73a590 100644
--- a/src/fastfield/bytes/mod.rs
+++ b/src/fastfield/bytes/mod.rs
@@ -22,9 +22,7 @@ mod tests {
index_writer.add_document(doc!(field=>vec![1u8, 3, 5, 7, 9]));
index_writer.add_document(doc!(field=>vec![0u8; 1000]));
assert!(index_writer.commit().is_ok());
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let reader = searcher.segment_reader(0);
let bytes_reader = reader.bytes_fast_field_reader(field).unwrap();
diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs
index b8e288f52..3e2a30eb2 100644
--- a/src/fastfield/multivalued/mod.rs
+++ b/src/fastfield/multivalued/mod.rs
@@ -9,14 +9,14 @@ mod tests {
extern crate time;
- use query::QueryParser;
+ use self::time::Duration;
use collector::TopDocs;
+ use query::QueryParser;
use schema::Cardinality;
use schema::Facet;
use schema::IntOptions;
use schema::Schema;
use Index;
- use self::time::Duration;
#[test]
fn test_multivalued_u64() {
@@ -34,11 +34,12 @@ mod tests {
index_writer.add_document(doc!(field=>5u64, field=>20u64,field=>1u64));
assert!(index_writer.commit().is_ok());
- index.load_searchers().unwrap();
- let searcher = index.searcher();
- let reader = searcher.segment_reader(0);
+ let searcher = index.reader().unwrap().searcher();
+ let segment_reader = searcher.segment_reader(0);
let mut vals = Vec::new();
- let multi_value_reader = reader.multi_fast_field_reader::(field).unwrap();
+ let multi_value_reader = segment_reader
+ .multi_fast_field_reader::(field)
+ .unwrap();
{
multi_value_reader.get_vals(2, &mut vals);
assert_eq!(&vals, &[4u64]);
@@ -63,92 +64,121 @@ mod tests {
.set_indexed()
.set_stored(),
);
- let time_i = schema_builder.add_i64_field(
- "time_stamp_i",
- IntOptions::default()
- .set_stored(),
- );
+ let time_i =
+ schema_builder.add_i64_field("time_stamp_i", IntOptions::default().set_stored());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let first_time_stamp = chrono::Utc::now();
- index_writer.add_document(doc!(date_field=>first_time_stamp, date_field=>first_time_stamp, time_i=>1i64));
+ index_writer.add_document(
+ doc!(date_field=>first_time_stamp, date_field=>first_time_stamp, time_i=>1i64),
+ );
index_writer.add_document(doc!(time_i=>0i64));
// add one second
- index_writer.add_document(doc!(date_field=>first_time_stamp + Duration::seconds(1), time_i=>2i64));
+ index_writer
+ .add_document(doc!(date_field=>first_time_stamp + Duration::seconds(1), time_i=>2i64));
// add another second
let two_secs_ahead = first_time_stamp + Duration::seconds(2);
index_writer.add_document(doc!(date_field=>two_secs_ahead, date_field=>two_secs_ahead,date_field=>two_secs_ahead, time_i=>3i64));
assert!(index_writer.commit().is_ok());
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let reader = searcher.segment_reader(0);
assert_eq!(reader.num_docs(), 4);
{
let parser = QueryParser::for_index(&index, vec![date_field]);
- let query = parser.parse_query(&format!("\"{}\"", first_time_stamp.to_rfc3339()).to_string())
+ let query = parser
+ .parse_query(&format!("\"{}\"", first_time_stamp.to_rfc3339()).to_string())
.expect("could not parse query");
- let results = searcher.search(&query, &TopDocs::with_limit(5))
+ let results = searcher
+ .search(&query, &TopDocs::with_limit(5))
.expect("could not query index");
assert_eq!(results.len(), 1);
for (_score, doc_address) in results {
let retrieved_doc = searcher.doc(doc_address).expect("cannot fetch doc");
- assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), first_time_stamp.timestamp());
- assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), 1i64);
+ assert_eq!(
+ retrieved_doc
+ .get_first(date_field)
+ .expect("cannot find value")
+ .date_value()
+ .timestamp(),
+ first_time_stamp.timestamp()
+ );
+ assert_eq!(
+ retrieved_doc
+ .get_first(time_i)
+ .expect("cannot find value")
+ .i64_value(),
+ 1i64
+ );
}
}
{
let parser = QueryParser::for_index(&index, vec![date_field]);
- let query = parser.parse_query(&format!("\"{}\"", two_secs_ahead.to_rfc3339()).to_string())
+ let query = parser
+ .parse_query(&format!("\"{}\"", two_secs_ahead.to_rfc3339()).to_string())
.expect("could not parse query");
- let results = searcher.search(&query, &TopDocs::with_limit(5))
+ let results = searcher
+ .search(&query, &TopDocs::with_limit(5))
.expect("could not query index");
assert_eq!(results.len(), 1);
for (_score, doc_address) in results {
let retrieved_doc = searcher.doc(doc_address).expect("cannot fetch doc");
- assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), two_secs_ahead.timestamp());
- assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), 3i64);
+ assert_eq!(
+ retrieved_doc
+ .get_first(date_field)
+ .expect("cannot find value")
+ .date_value()
+ .timestamp(),
+ two_secs_ahead.timestamp()
+ );
+ assert_eq!(
+ retrieved_doc
+ .get_first(time_i)
+ .expect("cannot find value")
+ .i64_value(),
+ 3i64
+ );
}
}
-
// TODO: support Date range queries
-// {
-// let parser = QueryParser::for_index(&index, vec![date_field]);
-// let range_q = format!("\"{}\"..\"{}\"",
-// (first_time_stamp + Duration::seconds(1)).to_rfc3339(),
-// (first_time_stamp + Duration::seconds(3)).to_rfc3339()
-// );
-// let query = parser.parse_query(&range_q)
-// .expect("could not parse query");
-// let results = searcher.search(&query, &TopDocs::with_limit(5))
-// .expect("could not query index");
-//
-//
-// assert_eq!(results.len(), 2);
-// for (i, doc_pair) in results.iter().enumerate() {
-// let retrieved_doc = searcher.doc(doc_pair.1).expect("cannot fetch doc");
-// let offset_sec = match i {
-// 0 => 1,
-// 1 => 3,
-// _ => panic!("should not have more than 2 docs")
-// };
-// let time_i_val = match i {
-// 0 => 2,
-// 1 => 3,
-// _ => panic!("should not have more than 2 docs")
-// };
-// assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(),
-// (first_time_stamp + Duration::seconds(offset_sec)).timestamp());
-// assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), time_i_val);
-// }
-// }
+ // {
+ // let parser = QueryParser::for_index(&index, vec![date_field]);
+ // let range_q = format!("\"{}\"..\"{}\"",
+ // (first_time_stamp + Duration::seconds(1)).to_rfc3339(),
+ // (first_time_stamp + Duration::seconds(3)).to_rfc3339()
+ // );
+ // let query = parser.parse_query(&range_q)
+ // .expect("could not parse query");
+ // let results = searcher.search(&query, &TopDocs::with_limit(5))
+ // .expect("could not query index");
+ //
+ //
+ // assert_eq!(results.len(), 2);
+ // for (i, doc_pair) in results.iter().enumerate() {
+ // let retrieved_doc = searcher.doc(doc_pair.1).expect("cannot fetch doc");
+ // let offset_sec = match i {
+ // 0 => 1,
+ // 1 => 3,
+ // _ => panic!("should not have more than 2 docs")
+ // };
+ // let time_i_val = match i {
+ // 0 => 2,
+ // 1 => 3,
+ // _ => panic!("should not have more than 2 docs")
+ // };
+ // assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(),
+ // (first_time_stamp + Duration::seconds(offset_sec)).timestamp());
+ // assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), time_i_val);
+ // }
+ // }
}
#[test]
@@ -167,8 +197,7 @@ mod tests {
index_writer.add_document(doc!(field=> -5i64, field => -20i64, field=>1i64));
assert!(index_writer.commit().is_ok());
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let reader = searcher.segment_reader(0);
let mut vals = Vec::new();
let multi_value_reader = reader.multi_fast_field_reader::(field).unwrap();
diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs
index d7df15a37..3456de525 100644
--- a/src/fastfield/multivalued/reader.rs
+++ b/src/fastfield/multivalued/reader.rs
@@ -75,8 +75,7 @@ mod tests {
index_writer.add_document(doc);
}
index_writer.commit().expect("Commit failed");
- index.load_searchers().expect("Reloading searchers");
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut facet_reader = segment_reader.facet_reader(facet_field).unwrap();
diff --git a/src/functional_test.rs b/src/functional_test.rs
index 493f73c31..ff36369ea 100644
--- a/src/functional_test.rs
+++ b/src/functional_test.rs
@@ -22,6 +22,7 @@ fn test_indexing() {
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).unwrap();
+ let reader = index.reader().unwrap();
let mut rng = thread_rng();
@@ -36,8 +37,8 @@ fn test_indexing() {
index_writer.commit().expect("Commit failed");
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
// check that everything is correct.
check_index_content(&searcher, &committed_docs);
} else {
diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs
index 44c9010b6..94961f5fc 100644
--- a/src/indexer/index_writer.rs
+++ b/src/indexer/index_writer.rs
@@ -44,8 +44,8 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
-type DocumentSender = channel::Sender>;
-type DocumentReceiver = channel::Receiver>;
+type OperationSender = channel::Sender>;
+type OperationReceiver = channel::Receiver>;
/// Split the thread memory budget into
/// - the heap size
@@ -85,8 +85,8 @@ pub struct IndexWriter {
workers_join_handle: Vec>>,
- document_receiver: DocumentReceiver,
- document_sender: DocumentSender,
+ operation_receiver: OperationReceiver,
+ operation_sender: OperationSender,
segment_updater: SegmentUpdater,
@@ -133,7 +133,7 @@ pub fn open_index_writer(
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
return Err(TantivyError::InvalidArgument(err_msg));
}
- let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
+ let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
@@ -151,8 +151,8 @@ pub fn open_index_writer(
heap_size_in_bytes_per_thread,
index: index.clone(),
- document_receiver,
- document_sender,
+ operation_receiver: document_receiver,
+ operation_sender: document_sender,
segment_updater,
@@ -335,7 +335,7 @@ impl IndexWriter {
pub fn wait_merging_threads(mut self) -> Result<()> {
// this will stop the indexing thread,
// dropping the last reference to the segment_updater.
- drop(self.document_sender);
+ drop(self.operation_sender);
let former_workers_handles = mem::replace(&mut self.workers_join_handle, vec![]);
for join_handle in former_workers_handles {
@@ -384,7 +384,7 @@ impl IndexWriter {
/// The thread consumes documents from the pipeline.
///
fn add_indexing_worker(&mut self) -> Result<()> {
- let document_receiver_clone = self.document_receiver.clone();
+ let document_receiver_clone = self.operation_receiver.clone();
let mut segment_updater = self.segment_updater.clone();
let generation = self.generation;
@@ -479,11 +479,11 @@ impl IndexWriter {
/// when no documents are remaining.
///
/// Returns the former segment_ready channel.
- fn recreate_document_channel(&mut self) -> DocumentReceiver {
- let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
+ fn recreate_document_channel(&mut self) -> OperationReceiver {
+ let (document_sender, document_receiver): (OperationSender, OperationReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
- mem::replace(&mut self.document_sender, document_sender);
- mem::replace(&mut self.document_receiver, document_receiver)
+ mem::replace(&mut self.operation_sender, document_sender);
+ mem::replace(&mut self.operation_receiver, document_receiver)
}
/// Rollback to the last commit
@@ -501,7 +501,7 @@ impl IndexWriter {
// segment updates will be ignored.
self.segment_updater.kill();
- let document_receiver = self.document_receiver.clone();
+ let document_receiver = self.operation_receiver.clone();
// take the directory lock to create a new index_writer.
let directory_lock = self
@@ -648,7 +648,7 @@ impl IndexWriter {
pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
- let send_result = self.document_sender.send(vec![add_operation]);
+ let send_result = self.operation_sender.send(vec![add_operation]);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
}
@@ -709,7 +709,7 @@ impl IndexWriter {
}
}
}
- let send_result = self.document_sender.send(adds);
+ let send_result = self.operation_sender.send(adds);
if let Err(e) = send_result {
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
};
@@ -728,8 +728,9 @@ mod tests {
use error::*;
use indexer::NoMergePolicy;
use query::TermQuery;
- use schema::{self, Document, IndexRecordOption};
+ use schema::{self, IndexRecordOption};
use Index;
+ use ReloadPolicy;
use Term;
#[test]
@@ -757,6 +758,11 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let a_term = Term::from_field_text(text_field, "a");
let b_term = Term::from_field_text(text_field, "b");
@@ -769,7 +775,7 @@ mod tests {
index_writer.run(operations);
index_writer.commit().expect("failed to commit");
- index.load_searchers().expect("failed to load searchers");
+ reader.reload().expect("failed to load searchers");
let a_term = Term::from_field_text(text_field, "a");
let b_term = Term::from_field_text(text_field, "b");
@@ -777,7 +783,7 @@ mod tests {
let a_query = TermQuery::new(a_term, IndexRecordOption::Basic);
let b_query = TermQuery::new(b_term, IndexRecordOption::Basic);
- let searcher = index.searcher();
+ let searcher = reader.searcher();
let a_docs = searcher
.search(&a_query, &TopDocs::with_limit(1))
@@ -864,9 +870,13 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
-
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
let num_docs_containing = |s: &str| {
- let searcher = index.searcher();
+ let searcher = reader.searcher();
let term = Term::from_field_text(text_field, s);
searcher.doc_freq(&term)
};
@@ -876,7 +886,6 @@ mod tests {
let mut index_writer = index.writer(3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.rollback().unwrap();
-
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing("a"), 0);
{
@@ -884,13 +893,13 @@ mod tests {
index_writer.add_document(doc!(text_field=>"c"));
}
assert!(index_writer.commit().is_ok());
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 1);
assert_eq!(num_docs_containing("c"), 1);
}
- index.load_searchers().unwrap();
- index.searcher();
+ reader.reload().unwrap();
+ reader.searcher();
}
#[test]
@@ -898,32 +907,33 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
let num_docs_containing = |s: &str| {
- let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
- searcher.doc_freq(&term_a)
+ reader.searcher().doc_freq(&term_a)
};
{
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
- let mut doc = Document::default();
- doc.add_text(text_field, "a");
- index_writer.add_document(doc);
+ index_writer.add_document(doc!(text_field=>"a"));
}
index_writer.commit().expect("commit failed");
for _doc in 0..100 {
- let mut doc = Document::default();
- doc.add_text(text_field, "a");
- index_writer.add_document(doc);
+ index_writer.add_document(doc!(text_field=>"a"));
}
- // this should create 8 segments and trigger a merge.
+ // this should create 8 segments and trigger a merge.
index_writer.commit().expect("commit failed");
index_writer
.wait_merging_threads()
.expect("waiting merging thread failed");
- index.load_searchers().unwrap();
+
+ reader.reload().unwrap();
assert_eq!(num_docs_containing("a"), 200);
assert!(index.searchable_segments().unwrap().len() < 8);
@@ -990,11 +1000,15 @@ mod tests {
}
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
let num_docs_containing = |s: &str| {
- let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
- searcher.doc_freq(&term_a)
+ index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap()
+ .searcher()
+ .doc_freq(&term_a)
};
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 100);
@@ -1026,11 +1040,9 @@ mod tests {
index_writer.add_document(doc!(text_field => "b"));
}
assert!(index_writer.commit().is_err());
- index.load_searchers().unwrap();
let num_docs_containing = |s: &str| {
- let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
- searcher.doc_freq(&term_a)
+ index.reader().unwrap().searcher().doc_freq(&term_a)
};
assert_eq!(num_docs_containing("a"), 100);
assert_eq!(num_docs_containing("b"), 0);
diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs
index 37c07b7a6..97048546b 100644
--- a/src/indexer/merger.rs
+++ b/src/indexer/merger.rs
@@ -194,17 +194,17 @@ impl IndexMerger {
fast_field_serializer,
)?;
}
- FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => {
- match options.get_fastfield_cardinality() {
- Some(Cardinality::SingleValue) => {
- self.write_single_fast_field(field, fast_field_serializer)?;
- }
- Some(Cardinality::MultiValues) => {
- self.write_multi_fast_field(field, fast_field_serializer)?;
- }
- None => {}
+ FieldType::U64(ref options)
+ | FieldType::I64(ref options)
+ | FieldType::Date(ref options) => match options.get_fastfield_cardinality() {
+ Some(Cardinality::SingleValue) => {
+ self.write_single_fast_field(field, fast_field_serializer)?;
}
- }
+ Some(Cardinality::MultiValues) => {
+ self.write_multi_fast_field(field, fast_field_serializer)?;
+ }
+ None => {}
+ },
FieldType::Str(_) => {
// We don't handle str fast field for the moment
// They can be implemented using what is done
@@ -676,8 +676,8 @@ mod tests {
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let bytes_score_field = schema_builder.add_bytes_field("score_bytes");
let index = Index::create_in_ram(schema_builder.build());
+ let reader = index.reader().unwrap();
let curr_time = chrono::Utc::now();
-
let add_score_bytes = |doc: &mut Document, score: u32| {
let mut bytes = Vec::new();
bytes
@@ -748,8 +748,8 @@ mod tests {
index_writer.wait_merging_threads().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
let get_doc_ids = |terms: Vec| {
let query = BooleanQuery::new_multiterms_query(terms);
let top_docs = searcher.search(&query, &TestCollector).unwrap();
@@ -780,10 +780,7 @@ mod tests {
);
assert_eq!(
get_doc_ids(vec![Term::from_field_date(date_field, &curr_time)]),
- vec![
- DocAddress(0, 0),
- DocAddress(0, 3)
- ]
+ vec![DocAddress(0, 0), DocAddress(0, 3)]
);
}
{
@@ -848,7 +845,7 @@ mod tests {
let bytes_score_field = schema_builder.add_bytes_field("score_bytes");
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
-
+ let reader = index.reader().unwrap();
let search_term = |searcher: &Searcher, term: Term| {
let collector = FastFieldTestCollector::for_field(score_field);
let bytes_collector = BytesFastFieldTestCollector::for_field(bytes_score_field);
@@ -885,8 +882,8 @@ mod tests {
bytes_score_field => vec![0u8, 0, 0, 3],
));
index_writer.commit().expect("committed");
- index.load_searchers().unwrap();
- let ref searcher = *index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].max_doc(), 3);
@@ -932,8 +929,8 @@ mod tests {
bytes_score_field => vec![0u8, 0, 27, 88],
));
index_writer.commit().expect("committed");
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 2);
assert_eq!(searcher.num_docs(), 3);
@@ -994,8 +991,8 @@ mod tests {
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
@@ -1040,8 +1037,8 @@ mod tests {
index_writer.delete_term(Term::from_field_text(text_field, "c"));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
@@ -1091,9 +1088,9 @@ mod tests {
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
- let ref searcher = *index.searcher();
+ let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
@@ -1141,9 +1138,9 @@ mod tests {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
- let ref searcher = *index.searcher();
+ let searcher = reader.searcher();
assert!(segment_ids.is_empty());
assert!(searcher.segment_readers().is_empty());
assert_eq!(searcher.num_docs(), 0);
@@ -1155,6 +1152,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let facet_field = schema_builder.add_facet_field("facet");
let index = Index::create_in_ram(schema_builder.build());
+ let reader = index.reader().unwrap();
{
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
@@ -1184,9 +1182,9 @@ mod tests {
index_writer.commit().expect("committed");
}
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
- let searcher = index.searcher();
+ let searcher = reader.searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top"));
let (count, facet_counts) = searcher
@@ -1228,7 +1226,7 @@ mod tests {
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
test_searcher(
11,
&[
@@ -1249,7 +1247,7 @@ mod tests {
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
test_searcher(
9,
&[
@@ -1274,8 +1272,8 @@ mod tests {
index_writer.commit().expect("commit failed");
index_writer.add_document(doc!(int_field => 1u64));
index_writer.commit().expect("commit failed");
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index
@@ -1286,10 +1284,10 @@ mod tests {
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
- index.load_searchers().unwrap();
+ reader.reload().unwrap();
// commit has not been called yet. The document should still be
// there.
- assert_eq!(index.searcher().num_docs(), 2);
+ assert_eq!(reader.searcher().num_docs(), 2);
}
#[test]
@@ -1300,7 +1298,7 @@ mod tests {
.set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
let index = Index::create_in_ram(schema_builder.build());
-
+ let reader = index.reader().unwrap();
{
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut doc = Document::default();
@@ -1321,8 +1319,8 @@ mod tests {
.expect("Merging failed");
// assert delete has not been committed
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().expect("failed to load searcher 1");
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
index_writer.commit().unwrap();
@@ -1330,13 +1328,13 @@ mod tests {
index_writer.wait_merging_threads().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 0);
}
#[test]
- fn test_merge_multivalued_int_fields() {
+ fn test_merge_multivalued_int_fields_simple() {
let mut schema_builder = schema::Schema::builder();
let int_options = IntOptions::default()
.set_fast(Cardinality::MultiValues)
@@ -1353,7 +1351,6 @@ mod tests {
}
index_writer.add_document(doc);
};
-
index_doc(&mut index_writer, &[1, 2]);
index_doc(&mut index_writer, &[1, 2, 3]);
index_doc(&mut index_writer, &[4, 5]);
@@ -1362,19 +1359,14 @@ mod tests {
index_doc(&mut index_writer, &[3]);
index_doc(&mut index_writer, &[17]);
index_writer.commit().expect("committed");
-
index_doc(&mut index_writer, &[20]);
index_writer.commit().expect("committed");
-
index_doc(&mut index_writer, &[28, 27]);
index_doc(&mut index_writer, &[1_000]);
-
index_writer.commit().expect("committed");
}
- index.load_searchers().unwrap();
-
- let searcher = index.searcher();
-
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let mut vals: Vec = Vec::new();
{
@@ -1440,13 +1432,14 @@ mod tests {
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
- index_writer.wait_merging_threads().unwrap();
+ index_writer
+ .wait_merging_threads()
+ .expect("Wait for merging threads");
}
-
- index.load_searchers().unwrap();
+ reader.reload().expect("Load searcher");
{
- let searcher = index.searcher();
+ let searcher = reader.searcher();
println!(
"{:?}",
searcher
diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs
index 2580bedda..3dc373795 100644
--- a/src/indexer/segment_updater.rs
+++ b/src/indexer/segment_updater.rs
@@ -565,9 +565,8 @@ mod tests {
index_writer.delete_term(term);
assert!(index_writer.commit().is_ok());
}
-
- index.load_searchers().unwrap();
- assert_eq!(index.searcher().num_docs(), 302);
+ let reader = index.reader().unwrap();
+ assert_eq!(reader.searcher().num_docs(), 302);
{
index_writer
@@ -575,9 +574,9 @@ mod tests {
.expect("waiting for merging threads");
}
- index.load_searchers().unwrap();
- assert_eq!(index.searcher().segment_readers().len(), 1);
- assert_eq!(index.searcher().num_docs(), 302);
+ reader.reload().unwrap();
+ assert_eq!(reader.searcher().segment_readers().len(), 1);
+ assert_eq!(reader.searcher().num_docs(), 302);
}
#[test]
@@ -636,18 +635,18 @@ mod tests {
.expect("waiting for merging threads");
}
- index.load_searchers().unwrap();
- assert_eq!(index.searcher().num_docs(), 0);
+ let reader = index.reader().unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
let seg_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
assert!(seg_ids.is_empty());
- index.load_searchers().unwrap();
- assert_eq!(index.searcher().num_docs(), 0);
+ reader.reload().unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0);
// empty segments should be erased
assert!(index.searchable_segment_metas().unwrap().is_empty());
- assert!(index.searcher().segment_readers().is_empty());
+ assert!(reader.searcher().segment_readers().is_empty());
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 6beb6f32d..70c9e78fd 100755
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -75,9 +75,9 @@
//!
//! // # Searching
//!
-//! index.load_searchers()?;
+//! let reader = index.reader()?;
//!
-//! let searcher = index.searcher();
+//! let searcher = reader.searcher();
//!
//! let query_parser = QueryParser::for_index(&index, vec![title, body]);
//!
@@ -186,8 +186,8 @@ pub use error::TantivyError;
pub use error::TantivyError as Error;
extern crate census;
-extern crate owned_read;
pub extern crate chrono;
+extern crate owned_read;
/// Tantivy result.
pub type Result = std::result::Result;
@@ -215,6 +215,9 @@ pub mod space_usage;
pub mod store;
pub mod termdict;
+mod reader;
+
+pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy};
mod snippet;
pub use self::snippet::{Snippet, SnippetGenerator};
@@ -303,6 +306,7 @@ mod tests {
use Index;
use IndexWriter;
use Postings;
+ use ReloadPolicy;
pub fn assert_nearly_equals(expected: f32, val: f32) {
assert!(
@@ -391,8 +395,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let term_a = Term::from_field_text(text_field, "a");
assert_eq!(searcher.doc_freq(&term_a), 3);
let term_b = Term::from_field_text(text_field, "b");
@@ -419,8 +423,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let index_reader = index.reader().unwrap();
+ let searcher = index_reader.searcher();
let reader = searcher.segment_reader(0);
{
let fieldnorm_reader = reader.get_fieldnorms_reader(text_field);
@@ -455,8 +459,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let segment_reader: &SegmentReader = searcher.segment_reader(0);
let fieldnorms_reader = segment_reader.get_fieldnorms_reader(text_field);
assert_eq!(fieldnorms_reader.fieldnorm(0), 3);
@@ -484,6 +488,11 @@ mod tests {
let term_c = Term::from_field_text(text_field, "c");
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
@@ -505,10 +514,10 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
- let reader = searcher.segment_reader(0);
- let inverted_index = reader.inverted_index(text_field);
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
+ let segment_reader = searcher.segment_reader(0);
+ let inverted_index = segment_reader.inverted_index(text_field);
assert!(inverted_index
.read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions)
.is_none());
@@ -516,19 +525,19 @@ mod tests {
let mut postings = inverted_index
.read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 5);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, segment_reader));
}
{
let mut postings = inverted_index
.read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 3);
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 4);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, segment_reader));
}
}
{
@@ -541,10 +550,10 @@ mod tests {
index_writer.rollback().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
- let reader = searcher.segment_reader(0);
- let inverted_index = reader.inverted_index(term_abcd.field());
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
+ let seg_reader = searcher.segment_reader(0);
+ let inverted_index = seg_reader.inverted_index(term_abcd.field());
assert!(inverted_index
.read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions)
@@ -553,19 +562,19 @@ mod tests {
let mut postings = inverted_index
.read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, seg_reader));
assert_eq!(postings.doc(), 5);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, seg_reader));
}
{
let mut postings = inverted_index
.read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, seg_reader));
assert_eq!(postings.doc(), 3);
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, seg_reader));
assert_eq!(postings.doc(), 4);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, seg_reader));
}
}
{
@@ -578,10 +587,10 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
- let reader = searcher.segment_reader(0);
- let inverted_index = reader.inverted_index(term_abcd.field());
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
+ let segment_reader = searcher.segment_reader(0);
+ let inverted_index = segment_reader.inverted_index(term_abcd.field());
assert!(inverted_index
.read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions)
.is_none());
@@ -589,25 +598,25 @@ mod tests {
let mut postings = inverted_index
.read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, segment_reader));
}
{
let mut postings = inverted_index
.read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 3);
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 4);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, segment_reader));
}
{
let mut postings = inverted_index
.read_postings(&term_c, IndexRecordOption::WithFreqsAndPositions)
.unwrap();
- assert!(advance_undeleted(&mut postings, reader));
+ assert!(advance_undeleted(&mut postings, segment_reader));
assert_eq!(postings.doc(), 4);
- assert!(!advance_undeleted(&mut postings, reader));
+ assert!(!advance_undeleted(&mut postings, segment_reader));
}
}
}
@@ -622,8 +631,8 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(field=>1u64));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let term = Term::from_field_u64(field, 1u64);
let mut postings = searcher
.segment_reader(0)
@@ -646,8 +655,8 @@ mod tests {
let negative_val = -1i64;
index_writer.add_document(doc!(value_field => negative_val));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let term = Term::from_field_i64(value_field, negative_val);
let mut postings = searcher
.segment_reader(0)
@@ -669,8 +678,8 @@ mod tests {
let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
assert!(index_writer.commit().is_ok());
- assert!(index.load_searchers().is_ok());
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
segment_reader.inverted_index(absent_field); //< should not panic
}
@@ -681,6 +690,11 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
// writing the segment
let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap();
@@ -706,8 +720,8 @@ mod tests {
remove_document(&mut index_writer, "38");
remove_document(&mut index_writer, "34");
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 6);
}
@@ -727,8 +741,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let index_reader = index.reader().unwrap();
+ let searcher = index_reader.searcher();
let reader = searcher.segment_reader(0);
let inverted_index = reader.inverted_index(text_field);
let term_abcd = Term::from_field_text(text_field, "abcd");
@@ -752,7 +766,7 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
-
+ let reader = index.reader().unwrap();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
@@ -762,8 +776,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
let get_doc_ids = |terms: Vec| {
let query = BooleanQuery::new_multiterms_query(terms);
let topdocs = searcher.search(&query, &TestCollector).unwrap();
@@ -805,25 +819,22 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
-
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .unwrap();
+ assert_eq!(reader.searcher().num_docs(), 0u64);
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
- {
- let doc = doc!(text_field=>"af b");
- index_writer.add_document(doc);
- }
- {
- let doc = doc!(text_field=>"a b c");
- index_writer.add_document(doc);
- }
- {
- let doc = doc!(text_field=>"a b c d");
- index_writer.add_document(doc);
- }
+ index_writer.add_document(doc!(text_field=>"af b"));
+ index_writer.add_document(doc!(text_field=>"a b c"));
+ index_writer.add_document(doc!(text_field=>"a b c d"));
index_writer.commit().unwrap();
}
- index.searcher();
+ reader.reload().unwrap();
+ assert_eq!(reader.searcher().num_docs(), 3u64);
}
#[test]
@@ -860,9 +871,8 @@ mod tests {
index_writer.add_document(document);
index_writer.commit().unwrap();
}
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let segment_reader: &SegmentReader = searcher.segment_reader(0);
{
let fast_field_reader_res = segment_reader.fast_field_reader::(text_field);
diff --git a/src/postings/mod.rs b/src/postings/mod.rs
index e134e7a9e..e571e26df 100644
--- a/src/postings/mod.rs
+++ b/src/postings/mod.rs
@@ -100,9 +100,8 @@ pub mod tests {
}
index_writer.add_document(doc!(title => r#"abc be be be be abc"#));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let inverted_index = searcher.segment_reader(0u32).inverted_index(title);
let term = Term::from_field_text(title, "abc");
@@ -292,9 +291,8 @@ pub mod tests {
}
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
let term_a = Term::from_field_text(text_field, "a");
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut postings = segment_reader
.inverted_index(text_field)
@@ -331,10 +329,9 @@ pub mod tests {
}
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
index
};
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
// check that the basic usage works
@@ -402,8 +399,7 @@ pub mod tests {
index_writer.delete_term(term_0);
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
// make sure seeking still works
@@ -450,12 +446,9 @@ pub mod tests {
{
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.delete_term(term_1);
-
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
-
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
// finally, check that it's empty
{
@@ -511,7 +504,6 @@ pub mod tests {
}
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
index
};
}
diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs
index 97de4cd10..61bcc3537 100644
--- a/src/postings/postings_writer.rs
+++ b/src/postings/postings_writer.rs
@@ -33,9 +33,10 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box {
}
})
.unwrap_or_else(|| SpecializedPostingsWriter::::new_boxed()),
- FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) | FieldType::HierarchicalFacet => {
- SpecializedPostingsWriter::::new_boxed()
- }
+ FieldType::U64(_)
+ | FieldType::I64(_)
+ | FieldType::Date(_)
+ | FieldType::HierarchicalFacet => SpecializedPostingsWriter::::new_boxed(),
FieldType::Bytes => {
// FieldType::Bytes cannot actually be indexed.
// TODO fix during the indexer refactoring described in #276
diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs
index af26258f4..807265348 100644
--- a/src/postings/segment_postings.rs
+++ b/src/postings/segment_postings.rs
@@ -773,8 +773,7 @@ mod tests {
last_doc = doc + 1;
}
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
@@ -842,8 +841,7 @@ mod tests {
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut block_segments;
diff --git a/src/query/all_query.rs b/src/query/all_query.rs
index e6468e2d7..8b2b64233 100644
--- a/src/query/all_query.rs
+++ b/src/query/all_query.rs
@@ -101,8 +101,9 @@ mod tests {
index_writer.commit().unwrap();
index_writer.add_document(doc!(field=>"ccc"));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ reader.reload().unwrap();
+ let searcher = reader.searcher();
let weight = AllQuery.weight(&searcher, false).unwrap();
{
let reader = searcher.segment_reader(0);
diff --git a/src/query/boolean_query/mod.rs b/src/query/boolean_query/mod.rs
index b450f7f0a..16f37de26 100644
--- a/src/query/boolean_query/mod.rs
+++ b/src/query/boolean_query/mod.rs
@@ -51,7 +51,6 @@ mod tests {
}
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
(index, text_field)
}
@@ -60,7 +59,8 @@ mod tests {
let (index, text_field) = aux_test_helper();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
let query = query_parser.parse_query("(+a +b) d").unwrap();
- assert_eq!(query.count(&*index.searcher()).unwrap(), 3);
+ let searcher = index.reader().unwrap().searcher();
+ assert_eq!(query.count(&searcher).unwrap(), 3);
}
#[test]
@@ -68,7 +68,7 @@ mod tests {
let (index, text_field) = aux_test_helper();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
let query = query_parser.parse_query("+a").unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let weight = query.weight(&searcher, true).unwrap();
let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
assert!(scorer.is::());
@@ -78,7 +78,7 @@ mod tests {
pub fn test_boolean_termonly_intersection() {
let (index, text_field) = aux_test_helper();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
{
let query = query_parser.parse_query("+a +b +c").unwrap();
let weight = query.weight(&searcher, true).unwrap();
@@ -97,7 +97,7 @@ mod tests {
pub fn test_boolean_reqopt() {
let (index, text_field) = aux_test_helper();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
{
let query = query_parser.parse_query("+a b").unwrap();
let weight = query.weight(&searcher, true).unwrap();
@@ -126,10 +126,13 @@ mod tests {
query
};
+ let reader = index.reader().unwrap();
+
let matching_docs = |boolean_query: &Query| {
- let searcher = index.searcher();
- let test_docs = searcher.search(boolean_query, &TestCollector).unwrap();
- test_docs
+ reader
+ .searcher()
+ .search(boolean_query, &TestCollector)
+ .unwrap()
.docs()
.iter()
.cloned()
@@ -185,10 +188,12 @@ mod tests {
let query: Box = Box::new(term_query);
query
};
-
+ let reader = index.reader().unwrap();
let score_docs = |boolean_query: &Query| {
- let searcher = index.searcher();
- let fruit = searcher.search(boolean_query, &TestCollector).unwrap();
+ let fruit = reader
+ .searcher()
+ .search(boolean_query, &TestCollector)
+ .unwrap();
fruit.scores().to_vec()
};
diff --git a/src/query/fuzzy_query.rs b/src/query/fuzzy_query.rs
index 6539929bf..ea54b789b 100644
--- a/src/query/fuzzy_query.rs
+++ b/src/query/fuzzy_query.rs
@@ -52,9 +52,8 @@ lazy_static! {
/// ));
/// index_writer.commit().unwrap();
/// }
-///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// {
///
@@ -141,8 +140,8 @@ mod test {
));
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
{
let term = Term::from_field_text(country_field, "japon");
diff --git a/src/query/phrase_query/mod.rs b/src/query/phrase_query/mod.rs
index 90ae26451..a08e505a4 100644
--- a/src/query/phrase_query/mod.rs
+++ b/src/query/phrase_query/mod.rs
@@ -31,7 +31,6 @@ mod tests {
}
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
index
}
@@ -46,8 +45,7 @@ mod tests {
]);
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec = texts
.iter()
@@ -90,8 +88,7 @@ mod tests {
index_writer.add_document(doc!(text_field=>"a b c"));
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let phrase_query = PhraseQuery::new(vec![
Term::from_field_text(text_field, "a"),
Term::from_field_text(text_field, "b"),
@@ -115,8 +112,7 @@ mod tests {
let index = create_index(&["a b c", "a b c a b"]);
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec = texts
.iter()
@@ -148,8 +144,7 @@ mod tests {
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec = texts
.iter()
@@ -177,8 +172,7 @@ mod tests {
index_writer.add_document(doc!(text_field=>"a b c d e f g h"));
assert!(index_writer.commit().is_ok());
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<(usize, &str)>| {
let terms: Vec<(usize, Term)> = texts
.iter()
diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs
index 25b08ae1f..b6ac926fc 100644
--- a/src/query/query_parser/query_parser.rs
+++ b/src/query/query_parser/query_parser.rs
@@ -239,12 +239,13 @@ impl QueryParser {
let term = Term::from_field_i64(field, val);
Ok(vec![(0, term)])
}
- FieldType::Date(_) => {
- match chrono::DateTime::parse_from_rfc3339(phrase) {
- Ok(x) => Ok(vec![(0, Term::from_field_date(field, &x.with_timezone(&chrono::Utc)))]),
- Err(e) => Err(QueryParserError::DateFormatError(e))
- }
- }
+ FieldType::Date(_) => match chrono::DateTime::parse_from_rfc3339(phrase) {
+ Ok(x) => Ok(vec![(
+ 0,
+ Term::from_field_date(field, &x.with_timezone(&chrono::Utc)),
+ )]),
+ Err(e) => Err(QueryParserError::DateFormatError(e)),
+ },
FieldType::U64(_) => {
let val: u64 = u64::from_str(phrase)?;
let term = Term::from_field_u64(field, val);
@@ -791,7 +792,9 @@ mod test {
query_parser.parse_query("date:18a"),
Err(QueryParserError::DateFormatError(_))
);
- assert!(query_parser.parse_query("date:\"1985-04-12T23:20:50.52Z\"").is_ok());
+ assert!(query_parser
+ .parse_query("date:\"1985-04-12T23:20:50.52Z\"")
+ .is_ok());
}
#[test]
diff --git a/src/query/range_query.rs b/src/query/range_query.rs
index f111e90e1..3aa996520 100644
--- a/src/query/range_query.rs
+++ b/src/query/range_query.rs
@@ -61,8 +61,8 @@ fn map_bound TTo>(
/// # }
/// # index_writer.commit().unwrap();
/// # }
-/// # index.load_searchers()?;
-/// let searcher = index.searcher();
+/// # let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970);
///
@@ -316,8 +316,8 @@ mod tests {
}
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960u64..1970u64);
@@ -355,8 +355,8 @@ mod tests {
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let count_multiples =
|range_query: RangeQuery| searcher.search(&range_query, &Count).unwrap();
diff --git a/src/query/regex_query.rs b/src/query/regex_query.rs
index 3d3254b2f..ec7dcceb7 100644
--- a/src/query/regex_query.rs
+++ b/src/query/regex_query.rs
@@ -44,8 +44,8 @@ use Searcher;
/// index_writer.commit().unwrap();
/// }
///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// let term = Term::from_field_text(title, "Diary");
/// let query = RegexQuery::new("d[ai]{2}ry".to_string(), title);
@@ -108,8 +108,8 @@ mod test {
));
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
{
let regex_query = RegexQuery::new("jap[ao]n".to_string(), country_field);
let scored_docs = searcher
diff --git a/src/query/term_query/mod.rs b/src/query/term_query/mod.rs
index edc4af411..2a85e9383 100644
--- a/src/query/term_query/mod.rs
+++ b/src/query/term_query/mod.rs
@@ -32,9 +32,7 @@ mod tests {
}
assert!(index_writer.commit().is_ok());
}
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let term_query = TermQuery::new(
Term::from_field_text(text_field, "a"),
IndexRecordOption::Basic,
@@ -65,8 +63,7 @@ mod tests {
index_writer.add_document(doc!(left_field => "left4 left1"));
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
{
let term = Term::from_field_text(left_field, "left2");
let term_query = TermQuery::new(term, IndexRecordOption::WithFreqs);
diff --git a/src/query/term_query/term_query.rs b/src/query/term_query/term_query.rs
index 8ddf42762..6dc52acb2 100644
--- a/src/query/term_query/term_query.rs
+++ b/src/query/term_query/term_query.rs
@@ -48,9 +48,8 @@ use Term;
/// ));
/// index_writer.commit()?;
/// }
-///
-/// index.load_searchers()?;
-/// let searcher = index.searcher();
+/// let reader = index.reader()?;
+/// let searcher = reader.searcher();
///
/// let query = TermQuery::new(
/// Term::from_field_text(title, "diary"),
diff --git a/src/reader/mod.rs b/src/reader/mod.rs
new file mode 100644
index 000000000..67d0ba52c
--- /dev/null
+++ b/src/reader/mod.rs
@@ -0,0 +1,187 @@
+mod pool;
+
+use self::pool::{LeasedItem, Pool};
+use core::Segment;
+use directory::Directory;
+use directory::WatchHandle;
+use directory::META_LOCK;
+use std::sync::Arc;
+use Index;
+use Result;
+use Searcher;
+use SegmentReader;
+
+/// Defines when a new version of the index should be reloaded.
+///
+/// Regardless of whether you search and index in the same process, tantivy does not necessarily
+/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define
+/// when you want your index to be reloaded.
+#[derive(Clone, Copy)]
+pub enum ReloadPolicy {
+ /// The index is entirely reloaded manually.
+ /// All updates of the index should be manual.
+ ///
+ /// No change is reflected automatically. You are required to call `.load_seacher()` manually.
+ Manual,
+ /// The index is reloaded within milliseconds after a new commit is available.
+ /// This is made possible by watching changes in the `meta.json` file.
+ OnCommit, // TODO add NEAR_REAL_TIME(target_ms)
+}
+
+/// `IndexReader` builder
+///
+/// It makes it possible to set the following values.
+///
+/// - `num_searchers` (by default, the number of detected CPU threads):
+///
+/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block
+/// until the one of the searcher in-use gets released.
+/// - `reload_policy` (by default `ReloadPolicy::OnCommit`):
+///
+/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
+#[derive(Clone)]
+pub struct IndexReaderBuilder {
+ num_searchers: usize,
+ reload_policy: ReloadPolicy,
+ index: Index,
+}
+
+impl IndexReaderBuilder {
+ pub(crate) fn new(index: Index) -> IndexReaderBuilder {
+ IndexReaderBuilder {
+ num_searchers: num_cpus::get(),
+ reload_policy: ReloadPolicy::OnCommit,
+ index,
+ }
+ }
+
+ /// Builds the reader.
+ ///
+ /// Building the reader is a non-trivial operation that requires
+ /// to open different segment readers. It may take hundreds of milliseconds
+ /// of time and it may return an error.
+ /// TODO(pmasurel) Use the `TryInto` trait once it is available in stable.
+ pub fn try_into(self) -> Result {
+ let inner_reader = InnerIndexReader {
+ index: self.index,
+ num_searchers: self.num_searchers,
+ searcher_pool: Pool::new(),
+ };
+ inner_reader.reload()?;
+ let inner_reader_arc = Arc::new(inner_reader);
+ let watch_handle_opt: Option;
+ match self.reload_policy {
+ ReloadPolicy::Manual => {
+ // No need to set anything...
+ watch_handle_opt = None;
+ }
+ ReloadPolicy::OnCommit => {
+ let inner_reader_arc_clone = inner_reader_arc.clone();
+ let callback = move || {
+ if let Err(err) = inner_reader_arc_clone.reload() {
+ error!(
+ "Error while loading searcher after commit was detected. {:?}",
+ err
+ );
+ }
+ };
+ let watch_handle = inner_reader_arc.index.directory().watch(Box::new(callback));
+ watch_handle_opt = Some(watch_handle);
+ }
+ }
+ Ok(IndexReader {
+ inner: inner_reader_arc,
+ watch_handle_opt,
+ })
+ }
+
+ /// Sets the reload_policy.
+ ///
+ /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details.
+ pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
+ self.reload_policy = reload_policy;
+ self
+ }
+
+ /// Sets the number of `Searcher` in the searcher pool.
+ pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
+ self.num_searchers = num_searchers;
+ self
+ }
+}
+
+struct InnerIndexReader {
+ num_searchers: usize,
+ searcher_pool: Pool,
+ index: Index,
+}
+
+impl InnerIndexReader {
+ fn reload(&self) -> Result<()> {
+ let segment_readers: Vec = {
+ let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
+ let searchable_segments = self.searchable_segments()?;
+ searchable_segments
+ .iter()
+ .map(SegmentReader::open)
+ .collect::>()?
+ };
+ let schema = self.index.schema();
+ let searchers = (0..self.num_searchers)
+ .map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()))
+ .collect();
+ self.searcher_pool.publish_new_generation(searchers);
+ Ok(())
+ }
+
+ /// Returns the list of segments that are searchable
+ fn searchable_segments(&self) -> Result> {
+ self.index.searchable_segments()
+ }
+
+ fn searcher(&self) -> LeasedItem {
+ self.searcher_pool.acquire()
+ }
+}
+
+/// `IndexReader` is your entry point to read and search the index.
+///
+/// It controls when a new version of the index should be loaded and lends
+/// you instances of `Searcher` for the last loaded version.
+///
+/// `Clone` does not clone the different pool of searcher. `IndexReader`
+/// just wraps and `Arc`.
+#[derive(Clone)]
+pub struct IndexReader {
+ inner: Arc,
+ watch_handle_opt: Option,
+}
+
+impl IndexReader {
+ /// Update searchers so that they reflect the state of the last
+ /// `.commit()`.
+ ///
+ /// If you set up the `OnCommit` `ReloadPolicy` (which is the default)
+ /// every commit should be rapidly reflected on your `IndexReader` and you should
+ /// not need to call `reload()` at all.
+ ///
+ /// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests
+ /// it can be nice to deterministically force the reload of searchers.
+ pub fn reload(&self) -> Result<()> {
+ self.inner.reload()
+ }
+
+ /// Returns a searcher
+ ///
+ /// This method should be called every single time a search
+ /// query is performed.
+ /// The searchers are taken from a pool of `num_searchers` searchers.
+ /// If no searcher is available
+ /// this may block.
+ ///
+ /// The same searcher must be used for a given query, as it ensures
+ /// the use of a consistent segment set.
+ pub fn searcher(&self) -> LeasedItem {
+ self.inner.searcher()
+ }
+}
diff --git a/src/core/pool.rs b/src/reader/pool.rs
similarity index 100%
rename from src/core/pool.rs
rename to src/reader/pool.rs
diff --git a/src/schema/document.rs b/src/schema/document.rs
index 2c535446a..bec54177a 100644
--- a/src/schema/document.rs
+++ b/src/schema/document.rs
@@ -90,7 +90,7 @@ impl Document {
/// Add a date field
pub fn add_date(&mut self, field: Field, value: &DateTime) {
- self.add(FieldValue::new(field, Value::Date(DateTime::from(*value))));
+ self.add(FieldValue::new(field, Value::Date(*value)));
}
/// Add a bytes field
diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs
index fc5972b94..89a8b251d 100644
--- a/src/schema/field_entry.rs
+++ b/src/schema/field_entry.rs
@@ -87,7 +87,9 @@ impl FieldEntry {
pub fn is_indexed(&self) -> bool {
match self.field_type {
FieldType::Str(ref options) => options.get_indexing_options().is_some(),
- FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => options.is_indexed(),
+ FieldType::U64(ref options)
+ | FieldType::I64(ref options)
+ | FieldType::Date(ref options) => options.is_indexed(),
FieldType::HierarchicalFacet => true,
FieldType::Bytes => false,
}
@@ -104,7 +106,9 @@ impl FieldEntry {
/// Returns true iff the field is stored
pub fn is_stored(&self) -> bool {
match self.field_type {
- FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => options.is_stored(),
+ FieldType::U64(ref options)
+ | FieldType::I64(ref options)
+ | FieldType::Date(ref options) => options.is_stored(),
FieldType::Str(ref options) => options.is_stored(),
// TODO make stored hierarchical facet optional
FieldType::HierarchicalFacet => true,
diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs
index 79c2656a6..a67451d7a 100644
--- a/src/schema/field_type.rs
+++ b/src/schema/field_type.rs
@@ -95,7 +95,9 @@ impl FieldType {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
.map(|indexing_options| indexing_options.index_option()),
- FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::Date(ref int_options) => {
+ FieldType::U64(ref int_options)
+ | FieldType::I64(ref int_options)
+ | FieldType::Date(ref int_options) => {
if int_options.is_indexed() {
Some(IndexRecordOption::Basic)
} else {
@@ -116,9 +118,9 @@ impl FieldType {
match *json {
JsonValue::String(ref field_text) => match *self {
FieldType::Str(_) => Ok(Value::Str(field_text.clone())),
- FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => Err(ValueParsingError::TypeError(
- format!("Expected an integer, got {:?}", json),
- )),
+ FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => Err(
+ ValueParsingError::TypeError(format!("Expected an integer, got {:?}", json)),
+ ),
FieldType::HierarchicalFacet => Ok(Value::Facet(Facet::from(field_text))),
FieldType::Bytes => decode(field_text).map(Value::Bytes).map_err(|_| {
ValueParsingError::InvalidBase64(format!(
diff --git a/src/schema/flags.rs b/src/schema/flags.rs
index e766b4c27..104df2b33 100644
--- a/src/schema/flags.rs
+++ b/src/schema/flags.rs
@@ -43,7 +43,11 @@ pub const FAST: SchemaFlagList = SchemaFlagList {
};
impl BitOr> for SchemaFlagList
- where Head: Clone, OldHead: Clone, OldTail: Clone {
+where
+ Head: Clone,
+ OldHead: Clone,
+ OldTail: Clone,
+{
type Output = SchemaFlagList>;
fn bitor(self, head: SchemaFlagList) -> Self::Output {
@@ -54,7 +58,7 @@ impl BitOr> for SchemaFlagList<
}
}
-impl> BitOr for SchemaFlagList {
+impl> BitOr for SchemaFlagList {
type Output = IntOptions;
fn bitor(self, rhs: IntOptions) -> Self::Output {
@@ -62,7 +66,7 @@ impl> BitOr for SchemaFlagList {
}
}
-impl> BitOr for SchemaFlagList {
+impl> BitOr for SchemaFlagList {
type Output = TextOptions;
fn bitor(self, rhs: TextOptions) -> Self::Output {
diff --git a/src/schema/int_options.rs b/src/schema/int_options.rs
index b1599bd75..a95f236c3 100644
--- a/src/schema/int_options.rs
+++ b/src/schema/int_options.rs
@@ -1,4 +1,4 @@
-use schema::flags::{SchemaFlagList, FastFlag, IndexedFlag, StoredFlag};
+use schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag};
use std::ops::BitOr;
/// Express whether a field is single-value or multi-valued.
diff --git a/src/schema/schema.rs b/src/schema/schema.rs
index 994b71c9a..7d2be9ac9 100644
--- a/src/schema/schema.rs
+++ b/src/schema/schema.rs
@@ -97,7 +97,7 @@ impl SchemaBuilder {
pub fn add_date_field>(
&mut self,
field_name_str: &str,
- field_options: T
+ field_options: T,
) -> Field {
let field_name = String::from(field_name_str);
let field_entry = FieldEntry::new_date(field_name, field_options.into());
diff --git a/src/schema/value.rs b/src/schema/value.rs
index 0090b4435..cb27c64b3 100644
--- a/src/schema/value.rs
+++ b/src/schema/value.rs
@@ -107,10 +107,10 @@ impl Value {
}
}
- /// Returns the Date-value, provided the value is of the `Date` type.
- ///
- /// # Panics
- /// If the value is not of type `Date`
+ /// Returns the Date-value, provided the value is of the `Date` type.
+ ///
+ /// # Panics
+ /// If the value is not of type `Date`
pub fn date_value(&self) -> &DateTime {
match *self {
Value::Date(ref value) => value,
@@ -138,7 +138,9 @@ impl From for Value {
}
impl From for Value {
- fn from(date_time: DateTime) -> Value { Value::Date(date_time) }
+ fn from(date_time: DateTime) -> Value {
+ Value::Date(date_time)
+ }
}
impl<'a> From<&'a str> for Value {
@@ -161,10 +163,10 @@ impl From> for Value {
mod binary_serialize {
use super::Value;
+ use chrono::{TimeZone, Utc};
use common::BinarySerializable;
use schema::Facet;
use std::io::{self, Read, Write};
- use chrono::{Utc, TimeZone};
const TEXT_CODE: u8 = 0;
const U64_CODE: u8 = 1;
@@ -217,7 +219,7 @@ mod binary_serialize {
let value = i64::deserialize(reader)?;
Ok(Value::I64(value))
}
- DATE_CODE=> {
+ DATE_CODE => {
let timestamp = i64::deserialize(reader)?;
Ok(Value::Date(Utc.timestamp(timestamp, 0)))
}
diff --git a/src/snippet/mod.rs b/src/snippet/mod.rs
index 8a3895acb..3fa605b1c 100644
--- a/src/snippet/mod.rs
+++ b/src/snippet/mod.rs
@@ -241,8 +241,8 @@ fn select_best_fragment_combination(fragments: &[FragmentCandidate], text: &str)
/// # let query_parser = QueryParser::for_index(&index, vec![text_field]);
/// // ...
/// let query = query_parser.parse_query("haleurs flamands").unwrap();
-/// # index.load_searchers()?;
-/// # let searcher = index.searcher();
+/// # let reader = index.reader()?;
+/// # let searcher = reader.searcher();
/// let mut snippet_generator = SnippetGenerator::create(&searcher, &*query, text_field)?;
/// snippet_generator.set_max_num_chars(100);
/// let snippet = snippet_generator.snippet_from_doc(&doc);
@@ -528,9 +528,8 @@ Survey in 2016, 2017, and 2018."#;
index_writer.add_document(doc!(text_field => "a"));
index_writer.add_document(doc!(text_field => "a b"));
index_writer.commit().unwrap();
- index.load_searchers().unwrap();
}
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
{
let query = query_parser.parse_query("e").unwrap();
@@ -587,8 +586,7 @@ Survey in 2016, 2017, and 2018."#;
}
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let query_parser = QueryParser::for_index(&index, vec![text_field]);
let query = query_parser.parse_query("rust design").unwrap();
let mut snippet_generator =
diff --git a/src/space_usage/mod.rs b/src/space_usage/mod.rs
index f361d6860..8ffb841f0 100644
--- a/src/space_usage/mod.rs
+++ b/src/space_usage/mod.rs
@@ -304,9 +304,8 @@ mod test {
fn test_empty() {
let schema = Schema::builder().build();
let index = Index::create_in_ram(schema.clone());
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let searcher_space_usage = searcher.space_usage();
assert_eq!(0, searcher_space_usage.total());
}
@@ -344,8 +343,8 @@ mod test {
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let searcher_space_usage = searcher.space_usage();
assert!(searcher_space_usage.total() > 0);
assert_eq!(1, searcher_space_usage.segments().len());
@@ -384,8 +383,8 @@ mod test {
index_writer.commit().unwrap();
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let searcher_space_usage = searcher.space_usage();
assert!(searcher_space_usage.total() > 0);
assert_eq!(1, searcher_space_usage.segments().len());
@@ -423,9 +422,8 @@ mod test {
index_writer.add_document(doc!(name => "hello hi goodbye"));
index_writer.commit().unwrap();
}
-
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let searcher_space_usage = searcher.space_usage();
assert!(searcher_space_usage.total() > 0);
assert_eq!(1, searcher_space_usage.segments().len());
@@ -471,9 +469,8 @@ mod test {
index_writer2.commit().unwrap();
}
- index.load_searchers().unwrap();
-
- let searcher = index.searcher();
+ let reader = index.reader().unwrap();
+ let searcher = reader.searcher();
let searcher_space_usage = searcher.space_usage();
assert!(searcher_space_usage.total() > 0);
assert_eq!(1, searcher_space_usage.segments().len());
diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs
index f3157118c..89a98f072 100644
--- a/src/termdict/mod.rs
+++ b/src/termdict/mod.rs
@@ -159,8 +159,7 @@ mod tests {
index_writer.commit().unwrap();
}
}
- index.load_searchers().unwrap();
- let searcher = index.searcher();
+ let searcher = index.reader().unwrap().searcher();
let field_searcher = searcher.field(text_field);
let mut term_it = field_searcher.terms();