From 663dd89c051c30546903fcc5e7e6d47b2e91c17a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 20 Mar 2019 08:39:22 +0900 Subject: [PATCH] Feature/reader (#517) Adding IndexReader to the API. Making it possible to watch for changes. * Closes #500 --- CHANGELOG.md | 3 + Cargo.toml | 3 +- examples/basic_search.rs | 33 ++- examples/custom_collector.rs | 4 +- examples/custom_tokenizer.rs | 4 +- examples/deleting_updating_documents.rs | 16 +- examples/faceted_search.rs | 4 +- examples/integer_range_search.rs | 5 +- examples/iterating_docs_and_positions.rs | 4 +- examples/snippet.rs | 5 +- examples/stop_words.rs | 4 +- src/collector/count_collector.rs | 4 +- src/collector/facet_collector.rs | 28 +- src/collector/int_facet_collector.rs | 3 +- src/collector/mod.rs | 6 +- src/collector/multi_collector.rs | 7 +- src/collector/top_field_collector.rs | 29 +- src/collector/top_score_collector.rs | 9 +- src/core/index.rs | 187 ++++++++----- src/core/mod.rs | 2 +- src/core/segment_reader.rs | 4 +- src/directory/directory.rs | 18 ++ src/directory/directory_lock.rs | 2 +- src/directory/error.rs | 13 + src/directory/managed_directory.rs | 5 + src/directory/mmap_directory.rs | 320 ++++++++++++++++++----- src/directory/mod.rs | 3 + src/directory/ram_directory.rs | 141 ++++------ src/directory/tests.rs | 58 +++- src/directory/watch_event_router.rs | 156 +++++++++++ src/error.rs | 1 + src/fastfield/bytes/mod.rs | 4 +- src/fastfield/multivalued/mod.rs | 141 ++++++---- src/fastfield/multivalued/reader.rs | 3 +- src/functional_test.rs | 5 +- src/indexer/index_writer.rs | 94 ++++--- src/indexer/merger.rs | 105 ++++---- src/indexer/segment_updater.rs | 21 +- src/lib.rs | 144 +++++----- src/postings/mod.rs | 18 +- src/postings/postings_writer.rs | 7 +- src/postings/segment_postings.rs | 6 +- src/query/all_query.rs | 5 +- src/query/boolean_query/mod.rs | 27 +- src/query/fuzzy_query.rs | 9 +- src/query/phrase_query/mod.rs | 16 +- src/query/query_parser/query_parser.rs | 17 +- src/query/range_query.rs | 12 +- src/query/regex_query.rs | 8 +- src/query/term_query/mod.rs | 7 +- src/query/term_query/term_query.rs | 5 +- src/reader/mod.rs | 187 +++++++++++++ src/{core => reader}/pool.rs | 0 src/schema/document.rs | 2 +- src/schema/field_entry.rs | 8 +- src/schema/field_type.rs | 10 +- src/schema/flags.rs | 10 +- src/schema/int_options.rs | 2 +- src/schema/schema.rs | 2 +- src/schema/value.rs | 16 +- src/snippet/mod.rs | 10 +- src/space_usage/mod.rs | 23 +- src/termdict/mod.rs | 3 +- 63 files changed, 1354 insertions(+), 654 deletions(-) create mode 100644 src/directory/watch_event_router.rs create mode 100644 src/reader/mod.rs rename src/{core => reader}/pool.rs (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bec8fbff..d692de690 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ Tantivy 0.9.0 ===================== *0.9.0 index format is not compatible with the previous index format.* +- MAJOR BUGFIX : + Some `Mmap` objects were being leaked, and would never get released. (@fulmicoton) - Removed most unsafe (@fulmicoton) - Indexer memory footprint improved. (VInt comp, inlining the first block. (@fulmicoton) - Stemming in other language possible (@pentlander) @@ -12,6 +14,7 @@ previous index format.* - Removed `INT_STORED` and `INT_INDEXED`. It is now possible to use `STORED` and `INDEXED` for int fields. (@fulmicoton) - Added DateTime field (@barrotsteindev) +- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton) Tantivy 0.8.2 diff --git a/Cargo.toml b/Cargo.toml index 450f03e35..1e04cf4fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ num_cpus = "1.2" fs2={version="0.4", optional=true} itertools = "0.8" levenshtein_automata = {version="0.1", features=["fst_automaton"]} +notify = {version="4", optional=true} bit-set = "0.5" uuid = { version = "0.7.2", features = ["v4", "serde"] } crossbeam = "0.5" @@ -73,7 +74,7 @@ overflow-checks = true [features] # by default no-fail is disabled. We manually enable it when running test. default = ["mmap", "no_fail"] -mmap = ["atomicwrites", "fs2", "memmap"] +mmap = ["atomicwrites", "fs2", "memmap", "notify"] lz4-compression = ["lz4"] no_fail = ["fail/no_fail"] unstable = [] # useful for benches. diff --git a/examples/basic_search.rs b/examples/basic_search.rs index 3a0a71e2f..78c2c2d3b 100644 --- a/examples/basic_search.rs +++ b/examples/basic_search.rs @@ -20,6 +20,7 @@ use tantivy::collector::TopDocs; use tantivy::query::QueryParser; use tantivy::schema::*; use tantivy::Index; +use tantivy::ReloadPolicy; use tempdir::TempDir; fn main() -> tantivy::Result<()> { @@ -170,24 +171,33 @@ fn main() -> tantivy::Result<()> { // // ### Searcher // - // Let's search our index. Start by reloading - // searchers in the index. This should be done - // after every `commit()`. - index.load_searchers()?; + // A reader is required to get search the index. + // It acts as a `Searcher` pool that reloads itself, + // depending on a `ReloadPolicy`. + // + // For a search server you will typically create one reader for the entire lifetime of your + // program, and acquire a new searcher for every single request. + // + // In the code below, we rely on the 'ON_COMMIT' policy: the reader + // will reload the index automatically after each commit. + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommit) + .try_into()?; // We now need to acquire a searcher. - // Some search experience might require more than - // one query. // - // The searcher ensure that we get to work - // with a consistent version of the index. + // A searcher points to snapshotted, immutable version of the index. + // + // Some search experience might require more than + // one query. Using the same searcher ensures that all of these queries will run on the + // same version of the index. // // Acquiring a `searcher` is very cheap. // - // You should acquire a searcher every time you - // start processing a request and + // You should acquire a searcher every time you start processing a request and // and release it right after your query is finished. - let searcher = index.searcher(); + let searcher = reader.searcher(); // ### Query @@ -224,7 +234,6 @@ fn main() -> tantivy::Result<()> { // Since the body field was not configured as stored, // the document returned will only contain // a title. - for (_score, doc_address) in top_docs { let retrieved_doc = searcher.doc(doc_address)?; println!("{}", schema.to_json(&retrieved_doc)); diff --git a/examples/custom_collector.rs b/examples/custom_collector.rs index b508d6e7c..40a1d1a0d 100644 --- a/examples/custom_collector.rs +++ b/examples/custom_collector.rs @@ -170,9 +170,9 @@ fn main() -> tantivy::Result<()> { price => 5_200u64 )); index_writer.commit()?; - index.load_searchers()?; - let searcher = index.searcher(); + let reader = index.reader()?; + let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, vec![product_name, product_description]); // here we want to get a hit on the 'ken' in Frankenstein diff --git a/examples/custom_tokenizer.rs b/examples/custom_tokenizer.rs index 72b69184d..5730adb10 100644 --- a/examples/custom_tokenizer.rs +++ b/examples/custom_tokenizer.rs @@ -91,9 +91,9 @@ fn main() -> tantivy::Result<()> { increasing confidence in the success of my undertaking."# )); index_writer.commit()?; - index.load_searchers()?; - let searcher = index.searcher(); + let reader = index.reader()?; + let searcher = reader.searcher(); // The query parser can interpret human queries. // Here, if the user does not specify which diff --git a/examples/deleting_updating_documents.rs b/examples/deleting_updating_documents.rs index ed59fa8c8..82fdd90b2 100644 --- a/examples/deleting_updating_documents.rs +++ b/examples/deleting_updating_documents.rs @@ -14,12 +14,16 @@ use tantivy::collector::TopDocs; use tantivy::query::TermQuery; use tantivy::schema::*; use tantivy::Index; +use tantivy::IndexReader; // A simple helper function to fetch a single document // given its id from our index. // It will be helpful to check our work. -fn extract_doc_given_isbn(index: &Index, isbn_term: &Term) -> tantivy::Result> { - let searcher = index.searcher(); +fn extract_doc_given_isbn( + reader: &IndexReader, + isbn_term: &Term, +) -> tantivy::Result> { + let searcher = reader.searcher(); // This is the simplest query you can think of. // It matches all of the documents containing a specific term. @@ -85,12 +89,12 @@ fn main() -> tantivy::Result<()> { isbn => "978-9176370711", )); index_writer.commit()?; - index.load_searchers()?; + let reader = index.reader()?; let frankenstein_isbn = Term::from_field_text(isbn, "978-9176370711"); // Oops our frankenstein doc seems mispelled - let frankenstein_doc_misspelled = extract_doc_given_isbn(&index, &frankenstein_isbn)?.unwrap(); + let frankenstein_doc_misspelled = extract_doc_given_isbn(&reader, &frankenstein_isbn)?.unwrap(); assert_eq!( schema.to_json(&frankenstein_doc_misspelled), r#"{"isbn":["978-9176370711"],"title":["Frankentein"]}"#, @@ -129,10 +133,10 @@ fn main() -> tantivy::Result<()> { // Everything happened as if the document was updated. index_writer.commit()?; // We reload our searcher to make our change available to clients. - index.load_searchers()?; + reader.reload()?; // No more typo! - let frankenstein_new_doc = extract_doc_given_isbn(&index, &frankenstein_isbn)?.unwrap(); + let frankenstein_new_doc = extract_doc_given_isbn(&reader, &frankenstein_isbn)?.unwrap(); assert_eq!( schema.to_json(&frankenstein_new_doc), r#"{"isbn":["978-9176370711"],"title":["Frankenstein"]}"#, diff --git a/examples/faceted_search.rs b/examples/faceted_search.rs index 9d68f2a4e..0a99c7131 100644 --- a/examples/faceted_search.rs +++ b/examples/faceted_search.rs @@ -55,9 +55,9 @@ fn main() -> tantivy::Result<()> { index_writer.commit()?; - index.load_searchers()?; + let reader = index.reader()?; - let searcher = index.searcher(); + let searcher = reader.searcher(); let mut facet_collector = FacetCollector::for_field(tags); facet_collector.add_facet("/pools"); diff --git a/examples/integer_range_search.rs b/examples/integer_range_search.rs index 2aac51873..dea3145b6 100644 --- a/examples/integer_range_search.rs +++ b/examples/integer_range_search.rs @@ -19,6 +19,7 @@ fn run() -> Result<()> { let year_field = schema_builder.add_u64_field("year", INDEXED); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); + let reader = index.reader()?; { let mut index_writer = index.writer_with_num_threads(1, 6_000_000)?; for year in 1950u64..2019u64 { @@ -27,8 +28,8 @@ fn run() -> Result<()> { index_writer.commit()?; // The index will be a range of years } - index.load_searchers()?; - let searcher = index.searcher(); + reader.reload()?; + let searcher = reader.searcher(); // The end is excluded i.e. here we are searching up to 1969 let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970); // Uses a Count collector to sum the total number of docs in the range diff --git a/examples/iterating_docs_and_positions.rs b/examples/iterating_docs_and_positions.rs index 62513ea7a..4668de3c0 100644 --- a/examples/iterating_docs_and_positions.rs +++ b/examples/iterating_docs_and_positions.rs @@ -33,9 +33,9 @@ fn main() -> tantivy::Result<()> { index_writer.add_document(doc!(title => "The modern Promotheus")); index_writer.commit()?; - index.load_searchers()?; + let reader = index.reader()?; - let searcher = index.searcher(); + let searcher = reader.searcher(); // A tantivy index is actually a collection of segments. // Similarly, a searcher just wraps a list `segment_reader`. diff --git a/examples/snippet.rs b/examples/snippet.rs index 35ba07557..b79ede83b 100644 --- a/examples/snippet.rs +++ b/examples/snippet.rs @@ -48,9 +48,8 @@ fn main() -> tantivy::Result<()> { // ... index_writer.commit()?; - index.load_searchers()?; - - let searcher = index.searcher(); + let reader = index.reader()?; + let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, vec![title, body]); let query = query_parser.parse_query("sycamore spring")?; diff --git a/examples/stop_words.rs b/examples/stop_words.rs index cdfe054e8..a6b338060 100644 --- a/examples/stop_words.rs +++ b/examples/stop_words.rs @@ -96,9 +96,9 @@ fn main() -> tantivy::Result<()> { index_writer.commit()?; - index.load_searchers()?; + let reader = index.reader()?; - let searcher = index.searcher(); + let searcher = reader.searcher(); let query_parser = QueryParser::for_index(&index, vec![title, body]); diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs index ea2a1d9cd..85ceaa3ab 100644 --- a/src/collector/count_collector.rs +++ b/src/collector/count_collector.rs @@ -40,8 +40,8 @@ use SegmentReader; /// index_writer.commit().unwrap(); /// } /// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// { /// let query_parser = QueryParser::for_index(&index, vec![title]); diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 083bd65ae..16ce9428b 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -122,17 +122,16 @@ fn facet_depth(facet_bytes: &[u8]) -> usize { /// facet => Facet::from("/lang/en"), /// facet => Facet::from("/category/biography") /// )); -/// index_writer.commit().unwrap(); +/// index_writer.commit()?; /// } -/// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// { /// let mut facet_collector = FacetCollector::for_field(facet); /// facet_collector.add_facet("/lang"); /// facet_collector.add_facet("/category"); -/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap(); +/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?; /// /// // This lists all of the facet counts /// let facets: Vec<(&Facet, u64)> = facet_counts @@ -147,7 +146,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize { /// { /// let mut facet_collector = FacetCollector::for_field(facet); /// facet_collector.add_facet("/category/fiction"); -/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap(); +/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?; /// /// // This lists all of the facet counts /// let facets: Vec<(&Facet, u64)> = facet_counts @@ -163,7 +162,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize { /// { /// let mut facet_collector = FacetCollector::for_field(facet); /// facet_collector.add_facet("/category/fiction"); -/// let facet_counts = searcher.search(&AllQuery, &facet_collector).unwrap(); +/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?; /// /// // This lists all of the facet counts /// let facets: Vec<(&Facet, u64)> = facet_counts.top_k("/category/fiction", 1); @@ -483,8 +482,8 @@ mod tests { index_writer.add_document(doc); } index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let mut facet_collector = FacetCollector::for_field(facet_field); facet_collector.add_facet(Facet::from("/top1")); let counts = searcher.search(&AllQuery, &facet_collector).unwrap(); @@ -532,8 +531,8 @@ mod tests { facet_field => Facet::from_text(&"/subjects/B/b"), )); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 1); let mut facet_collector = FacetCollector::for_field(facet_field); facet_collector.add_facet("/subjects"); @@ -579,9 +578,7 @@ mod tests { index_writer.add_document(doc); } index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let mut facet_collector = FacetCollector::for_field(facet_field); facet_collector.add_facet("/facet"); @@ -635,8 +632,7 @@ mod bench { index_writer.add_document(doc); } index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - + let reader = index.reader().unwrap(); b.iter(|| { let searcher = index.searcher(); let facet_collector = FacetCollector::for_field(facet_field); diff --git a/src/collector/int_facet_collector.rs b/src/collector/int_facet_collector.rs index 01f00cc37..4232343e6 100644 --- a/src/collector/int_facet_collector.rs +++ b/src/collector/int_facet_collector.rs @@ -101,8 +101,7 @@ mod tests { assert_eq!(index_writer.commit().unwrap(), 10u64); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().searcher(); let mut ffvf_i64: IntFacetCollector = IntFacetCollector::new(num_field_i64); let mut ffvf_u64: IntFacetCollector = IntFacetCollector::new(num_field_u64); diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 046b26ed0..515799fdf 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -53,9 +53,9 @@ use tantivy::collector::{Count, TopDocs}; # index_writer.add_document(doc!( # title => "The Diary of Muadib", # )); -# index_writer.commit().unwrap(); -# index.load_searchers()?; -# let searcher = index.searcher(); +# index_writer.commit()?; +# let reader = index.reader()?; +# let searcher = reader.searcher(); # let query_parser = QueryParser::for_index(&index, vec![title]); # let query = query_parser.parse_query("diary")?; let (doc_count, top_docs): (usize, Vec<(Score, DocAddress)>) = diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index f12c648a4..cc3bfc488 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -134,8 +134,8 @@ impl FruitHandle { /// index_writer.commit().unwrap(); /// } /// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// let mut collectors = MultiCollector::new(); /// let top_docs_handle = collectors.add_collector(TopDocs::with_limit(2)); @@ -278,8 +278,7 @@ mod tests { index_writer.add_document(doc!(text=>"abc")); index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let term = Term::from_field_text(text, "abc"); let query = TermQuery::new(term, IndexRecordOption::Basic); diff --git a/src/collector/top_field_collector.rs b/src/collector/top_field_collector.rs index 02551310c..b1a2d5ec8 100644 --- a/src/collector/top_field_collector.rs +++ b/src/collector/top_field_collector.rs @@ -23,15 +23,16 @@ use SegmentReader; /// # use tantivy::schema::{Schema, Field, FAST, TEXT}; /// # use tantivy::{Index, Result, DocAddress}; /// # use tantivy::query::{Query, QueryParser}; +/// use tantivy::Searcher; /// use tantivy::collector::TopDocs; /// -/// # fn main() { +/// # fn main() -> tantivy::Result<()> { /// # let mut schema_builder = Schema::builder(); /// # let title = schema_builder.add_text_field("title", TEXT); /// # let rating = schema_builder.add_u64_field("rating", FAST); /// # let schema = schema_builder.build(); /// # let index = Index::create_in_ram(schema); -/// # let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); +/// # let mut index_writer = index.writer_with_num_threads(1, 3_000_000)?; /// # index_writer.add_document(doc!( /// # title => "The Name of the Wind", /// # rating => 92u64, @@ -39,13 +40,14 @@ use SegmentReader; /// # index_writer.add_document(doc!(title => "The Diary of Muadib", rating => 97u64)); /// # index_writer.add_document(doc!(title => "A Dairy Cow", rating => 63u64)); /// # index_writer.add_document(doc!(title => "The Diary of a Young Girl", rating => 80u64)); -/// # index_writer.commit().unwrap(); -/// # index.load_searchers().unwrap(); -/// # let query = QueryParser::for_index(&index, vec![title]).parse_query("diary").unwrap(); -/// # let top_docs = docs_sorted_by_rating(&index, &query, rating).unwrap(); +/// # index_writer.commit()?; +/// # let reader = index.reader()?; +/// # let query = QueryParser::for_index(&index, vec![title]).parse_query("diary")?; +/// # let top_docs = docs_sorted_by_rating(&reader.searcher(), &query, rating)?; /// # assert_eq!(top_docs, /// # vec![(97u64, DocAddress(0u32, 1)), /// # (80u64, DocAddress(0u32, 3))]); +/// # Ok(()) /// # } /// # /// /// Searches the document matching the given query, and @@ -53,7 +55,9 @@ use SegmentReader; /// /// given in argument. /// /// /// /// `field` is required to be a FAST field. -/// fn docs_sorted_by_rating(index: &Index, query: &Query, sort_by_field: Field) +/// fn docs_sorted_by_rating(searcher: &Searcher, +/// query: &Query, +/// sort_by_field: Field) /// -> Result> { /// /// // This is where we build our collector! @@ -61,8 +65,7 @@ use SegmentReader; /// /// // ... and here is our documents. Not this is a simple vec. /// // The `u64` in the pair is the value of our fast field for each documents. -/// index.searcher() -/// .search(query, &top_docs_by_rating) +/// searcher.search(query, &top_docs_by_rating) /// } /// ``` pub struct TopDocsByField { @@ -177,7 +180,7 @@ mod tests { size => 16u64, )); }); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let top_collector = TopDocs::with_limit(4).order_by_field(size); let top_docs: Vec<(u64, DocAddress)> = searcher.search(&query, &top_collector).unwrap(); @@ -204,7 +207,7 @@ mod tests { size => 12u64, )); }); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let top_collector: TopDocsByField = TopDocs::with_limit(4).order_by_field(Field(2)); let segment_reader = searcher.segment_reader(0u32); top_collector @@ -224,7 +227,7 @@ mod tests { size => 12u64, )); }); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment = searcher.segment_reader(0); let top_collector: TopDocsByField = TopDocs::with_limit(4).order_by_field(size); assert_matches!( @@ -247,8 +250,6 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); doc_adder(&mut index_writer); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let query_parser = QueryParser::for_index(&index, vec![query_field]); let query = query_parser.parse_query(query).unwrap(); (index, query) diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs index 869022686..43b7424d6 100644 --- a/src/collector/top_score_collector.rs +++ b/src/collector/top_score_collector.rs @@ -51,8 +51,8 @@ use SegmentReader; /// index_writer.commit().unwrap(); /// } /// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// let query_parser = QueryParser::for_index(&index, vec![title]); /// let query = query_parser.parse_query("diary")?; @@ -148,7 +148,6 @@ mod tests { index_writer.add_document(doc!(text_field=>"I like Droopy")); assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); index } @@ -159,6 +158,8 @@ mod tests { let query_parser = QueryParser::for_index(&index, vec![field]); let text_query = query_parser.parse_query("droopy tax").unwrap(); let score_docs: Vec<(Score, DocAddress)> = index + .reader() + .unwrap() .searcher() .search(&text_query, &TopDocs::with_limit(4)) .unwrap(); @@ -179,6 +180,8 @@ mod tests { let query_parser = QueryParser::for_index(&index, vec![field]); let text_query = query_parser.parse_query("droopy tax").unwrap(); let score_docs: Vec<(Score, DocAddress)> = index + .reader() + .unwrap() .searcher() .search(&text_query, &TopDocs::with_limit(2)) .unwrap(); diff --git a/src/core/index.rs b/src/core/index.rs index 8638d7fa2..62a31cc95 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,19 +1,14 @@ -use super::pool::LeasedItem; -use super::pool::Pool; use super::segment::create_segment; use super::segment::Segment; -use core::searcher::Searcher; use core::Executor; use core::IndexMeta; use core::SegmentId; use core::SegmentMeta; -use core::SegmentReader; use core::META_FILEPATH; use directory::ManagedDirectory; #[cfg(feature = "mmap")] use directory::MmapDirectory; use directory::INDEX_WRITER_LOCK; -use directory::META_LOCK; use directory::{Directory, RAMDirectory}; use error::DataCorruption; use error::TantivyError; @@ -21,6 +16,8 @@ use indexer::index_writer::open_index_writer; use indexer::index_writer::HEAP_SIZE_MIN; use indexer::segment_updater::save_new_metas; use num_cpus; +use reader::IndexReader; +use reader::IndexReaderBuilder; use schema::Field; use schema::FieldType; use schema::Schema; @@ -28,7 +25,6 @@ use serde_json; use std::borrow::BorrowMut; use std::fmt; use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokenizer::BoxedTokenizer; use tokenizer::TokenizerManager; @@ -53,8 +49,6 @@ fn load_metas(directory: &Directory) -> Result { pub struct Index { directory: ManagedDirectory, schema: Schema, - num_searchers: Arc, - searcher_pool: Arc>, executor: Arc, tokenizers: TokenizerManager, } @@ -159,16 +153,12 @@ impl Index { /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: ManagedDirectory, metas: &IndexMeta) -> Result { let schema = metas.schema.clone(); - let n_cpus = num_cpus::get(); let index = Index { directory, schema, - num_searchers: Arc::new(AtomicUsize::new(n_cpus)), - searcher_pool: Arc::new(Pool::new()), tokenizers: TokenizerManager::default(), executor: Arc::new(Executor::single_thread()), }; - index.load_searchers()?; Ok(index) } @@ -198,6 +188,22 @@ impl Index { } } + /// Create a default `IndexReader` for the given index. + /// + /// See [`Index.reader_builder()`](#method.reader_builder). + pub fn reader(&self) -> Result { + self.reader_builder().try_into() + } + + /// Create a `IndexReader` for the given index. + /// + /// Most project should create at most one reader for a given index. + /// This method is typically called only once per `Index` instance, + /// over the lifetime of most problem. + pub fn reader_builder(&self) -> IndexReaderBuilder { + IndexReaderBuilder::new(self.clone()) + } + /// Opens a new directory from an index path. #[cfg(feature = "mmap")] pub fn open_in_dir>(directory_path: P) -> Result { @@ -336,53 +342,6 @@ impl Index { .map(|segment_meta| segment_meta.id()) .collect()) } - - /// Sets the number of searchers to use - /// - /// Only works after the next call to `load_searchers` - pub fn set_num_searchers(&mut self, num_searchers: usize) { - self.num_searchers.store(num_searchers, Ordering::Release); - } - - /// Update searchers so that they reflect the state of the last - /// `.commit()`. - /// - /// If indexing happens in the same process as searching, - /// you most likely want to call `.load_searchers()` right after each - /// successful call to `.commit()`. - /// - /// If indexing and searching happen in different processes, the way to - /// get the freshest `index` at all time, is to watch `meta.json` and - /// call `load_searchers` whenever a changes happen. - pub fn load_searchers(&self) -> Result<()> { - let _meta_lock = self.directory().acquire_lock(&META_LOCK)?; - let searchable_segments = self.searchable_segments()?; - let segment_readers: Vec = searchable_segments - .iter() - .map(SegmentReader::open) - .collect::>()?; - let schema = self.schema(); - let num_searchers: usize = self.num_searchers.load(Ordering::Acquire); - let searchers = (0..num_searchers) - .map(|_| Searcher::new(schema.clone(), self.clone(), segment_readers.clone())) - .collect(); - self.searcher_pool.publish_new_generation(searchers); - Ok(()) - } - - /// Returns a searcher - /// - /// This method should be called every single time a search - /// query is performed. - /// The searchers are taken from a pool of `num_searchers` searchers. - /// If no searcher is available - /// this may block. - /// - /// The same searcher must be used for a given query, as it ensures - /// the use of a consistent segment set. - pub fn searcher(&self) -> LeasedItem { - self.searcher_pool.acquire() - } } impl fmt::Debug for Index { @@ -394,8 +353,16 @@ impl fmt::Debug for Index { #[cfg(test)] mod tests { use directory::RAMDirectory; + use schema::Field; use schema::{Schema, INDEXED, TEXT}; + use std::path::PathBuf; + use std::thread; + use std::time::Duration; + use tempdir::TempDir; use Index; + use IndexReader; + use IndexWriter; + use ReloadPolicy; #[test] fn test_indexer_for_field() { @@ -461,4 +428,106 @@ mod tests { let _ = schema_builder.add_u64_field("num_likes", INDEXED); schema_builder.build() } + + #[test] + fn test_index_on_commit_reload_policy() { + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let index = Index::create_in_ram(schema); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommit) + .try_into() + .unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); + let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + } + + #[test] + fn test_index_on_commit_reload_policy_mmap() { + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let tempdir = TempDir::new("index").unwrap(); + let tempdir_path = PathBuf::from(tempdir.path()); + let index = Index::create_in_dir(&tempdir_path, schema).unwrap(); + let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + writer.commit().unwrap(); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommit) + .try_into() + .unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); + test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + } + + #[test] + fn test_index_manual_policy_mmap() { + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let index = Index::create_from_tempdir(schema).unwrap(); + let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + writer.commit().unwrap(); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); + writer.add_document(doc!(field=>1u64)); + writer.commit().unwrap(); + thread::sleep(Duration::from_millis(500)); + assert_eq!(reader.searcher().num_docs(), 0); + reader.reload().unwrap(); + assert_eq!(reader.searcher().num_docs(), 1); + } + + #[test] + fn test_index_on_commit_reload_policy_different_directories() { + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let tempdir = TempDir::new("index").unwrap(); + let tempdir_path = PathBuf::from(tempdir.path()); + let write_index = Index::create_in_dir(&tempdir_path, schema).unwrap(); + let read_index = Index::open_in_dir(&tempdir_path).unwrap(); + let reader = read_index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommit) + .try_into() + .unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); + let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap(); + test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + } + + fn test_index_on_commit_reload_policy_aux( + field: Field, + writer: &mut IndexWriter, + reader: &IndexReader, + ) { + assert_eq!(reader.searcher().num_docs(), 0); + writer.add_document(doc!(field=>1u64)); + writer.commit().unwrap(); + let mut count = 0; + for _ in 0..100 { + count = reader.searcher().num_docs(); + if count > 0 { + break; + } + thread::sleep(Duration::from_millis(100)); + } + assert_eq!(count, 1); + writer.add_document(doc!(field=>2u64)); + writer.commit().unwrap(); + let mut count = 0; + for _ in 0..10 { + count = reader.searcher().num_docs(); + if count > 1 { + break; + } + thread::sleep(Duration::from_millis(100)); + } + assert_eq!(count, 2); + } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 1aae75f8b..9e5717afa 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -2,7 +2,6 @@ mod executor; pub mod index; mod index_meta; mod inverted_index_reader; -mod pool; pub mod searcher; mod segment; mod segment_component; @@ -25,6 +24,7 @@ pub use self::segment_reader::SegmentReader; use std::path::PathBuf; lazy_static! { + /// The meta file contains all the information about the list of segments and the schema /// of the index. pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 597ecae39..a8ac2b0fb 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -477,9 +477,7 @@ mod test { // ok, now we should have a deleted doc index_writer2.commit().unwrap(); } - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let docs: Vec = searcher.segment_reader(0).doc_ids_alive().collect(); assert_eq!(vec![0u32, 2u32], docs); } diff --git a/src/directory/directory.rs b/src/directory/directory.rs index c5f975c9d..943998e2d 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,6 +1,8 @@ use directory::directory_lock::Lock; use directory::error::LockError; use directory::error::{DeleteError, OpenReadError, OpenWriteError}; +use directory::WatchCallback; +use directory::WatchHandle; use directory::{ReadOnlySource, WritePtr}; use std::fmt; use std::io; @@ -187,6 +189,22 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { } } } + + /// Registers a callback that will be called whenever a change on the `meta.json` + /// using the `atomic_write` API is detected. + /// + /// The behavior when using `.watch()` on a file using `.open_write(...)` is, on the other + /// hand, undefined. + /// + /// The file will be watched for the lifetime of the returned `WatchHandle`. The caller is + /// required to keep it. + /// It does not override previous callbacks. When the file is modified, all callback that are + /// registered (and whose `WatchHandle` is still alive) are triggered. + /// + /// Internally, tantivy only uses this API to detect new commits to implement the + /// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the + /// `OnCommit` `ReloadPolicy` to work properly. + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle; } /// DirectoryClone diff --git a/src/directory/directory_lock.rs b/src/directory/directory_lock.rs index f341ea6f9..67c2585dd 100644 --- a/src/directory/directory_lock.rs +++ b/src/directory/directory_lock.rs @@ -43,7 +43,7 @@ lazy_static! { is_blocking: false }; /// The meta lock file is here to protect the segment files being opened by - /// `.load_searchers()` from being garbage collected. + /// `IndexReader::reload()` from being garbage collected. /// It makes it possible for another process to safely consume /// our index in-writing. Ideally, we may have prefered `RWLock` semantics /// here, but it is difficult to achieve on Windows. diff --git a/src/directory/error.rs b/src/directory/error.rs index 1179ad28d..e56971029 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -73,6 +73,14 @@ pub enum OpenDirectoryError { DoesNotExist(PathBuf), /// The path exists but is not a directory. NotADirectory(PathBuf), + /// IoError + IoError(io::Error), +} + +impl From for OpenDirectoryError { + fn from(io_err: io::Error) -> Self { + OpenDirectoryError::IoError(io_err) + } } impl fmt::Display for OpenDirectoryError { @@ -84,6 +92,11 @@ impl fmt::Display for OpenDirectoryError { OpenDirectoryError::NotADirectory(ref path) => { write!(f, "the path '{:?}' exists but is not a directory", path) } + OpenDirectoryError::IoError(ref err) => write!( + f, + "IOError while trying to open/create the directory. {:?}", + err + ), } } } diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 5367494f0..ddd8dfbba 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -4,6 +4,7 @@ use directory::DirectoryLock; use directory::Lock; use directory::META_LOCK; use directory::{ReadOnlySource, WritePtr}; +use directory::{WatchCallback, WatchHandle}; use error::DataCorruption; use serde_json; use std::collections::HashSet; @@ -241,6 +242,10 @@ impl Directory for ManagedDirectory { fn acquire_lock(&self, lock: &Lock) -> result::Result { self.directory.acquire_lock(lock) } + + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle { + self.directory.watch(watch_callback) + } } impl Clone for ManagedDirectory { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 8489f806f..70c277d56 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,8 +1,13 @@ extern crate fs2; +extern crate notify; use self::fs2::FileExt; +use self::notify::RawEvent; +use self::notify::RecursiveMode; +use self::notify::Watcher; use atomicwrites; use common::make_io_err; +use core::META_FILEPATH; use directory::error::LockError; use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; use directory::read_only_source::BoxedData; @@ -10,6 +15,9 @@ use directory::Directory; use directory::DirectoryLock; use directory::Lock; use directory::ReadOnlySource; +use directory::WatchCallback; +use directory::WatchCallbackList; +use directory::WatchHandle; use directory::WritePtr; use memmap::Mmap; use std::collections::HashMap; @@ -21,14 +29,16 @@ use std::io::{self, Seek, SeekFrom}; use std::io::{BufWriter, Read, Write}; use std::path::{Path, PathBuf}; use std::result; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; +use std::sync::Mutex; use std::sync::RwLock; use std::sync::Weak; +use std::thread; use tempdir::TempDir; /// Returns None iff the file exists, can be read, but is empty (and hence -/// cannot be mmapped). -/// +/// cannot be mmapped) fn open_mmap(full_path: &Path) -> result::Result, OpenReadError> { let file = File::open(full_path).map_err(|e| { if e.kind() == io::ErrorKind::NotFound { @@ -84,7 +94,7 @@ impl Default for MmapCache { } impl MmapCache { - fn get_info(&mut self) -> CacheInfo { + fn get_info(&self) -> CacheInfo { let paths: Vec = self.cache.keys().cloned().collect(); CacheInfo { counters: self.counters.clone(), @@ -92,28 +102,105 @@ impl MmapCache { } } + fn remove_weak_ref(&mut self) { + let keys_to_remove: Vec = self + .cache + .iter() + .filter(|(_, mmap_weakref)| mmap_weakref.upgrade().is_none()) + .map(|(key, _)| key.clone()) + .collect(); + for key in keys_to_remove { + self.cache.remove(&key); + } + } + // Returns None if the file exists but as a len of 0 (and hence is not mmappable). fn get_mmap(&mut self, full_path: &Path) -> Result>, OpenReadError> { - let path_in_cache = self.cache.contains_key(full_path); - if path_in_cache { - { - let mmap_weak_opt = self.cache.get(full_path); - if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) { - self.counters.hit += 1; - return Ok(Some(mmap_arc)); - } + if let Some(mmap_weak) = self.cache.get(full_path) { + if let Some(mmap_arc) = mmap_weak.upgrade() { + self.counters.hit += 1; + return Ok(Some(mmap_arc)); } - self.cache.remove(full_path); } + self.cache.remove(full_path); self.counters.miss += 1; - if let Some(mmap) = open_mmap(full_path)? { + Ok(if let Some(mmap) = open_mmap(full_path)? { let mmap_arc: Arc = Arc::new(Box::new(mmap)); - self.cache - .insert(full_path.to_owned(), Arc::downgrade(&mmap_arc)); - Ok(Some(mmap_arc)) + let mmap_weak = Arc::downgrade(&mmap_arc); + self.cache.insert(full_path.to_owned(), mmap_weak); + Some(mmap_arc) } else { - Ok(None) - } + None + }) + } +} + +struct InnerWatcherWrapper { + _watcher: Mutex, + watcher_router: WatchCallbackList, +} + +impl InnerWatcherWrapper { + pub fn new(path: &Path) -> Result<(Self, Receiver), notify::Error> { + let (tx, watcher_recv): (Sender, Receiver) = channel(); + // We need to initialize the + let mut watcher = notify::raw_watcher(tx)?; + watcher.watch(path, RecursiveMode::Recursive)?; + let inner = InnerWatcherWrapper { + _watcher: Mutex::new(watcher), + watcher_router: Default::default(), + }; + Ok((inner, watcher_recv)) + } +} + +#[derive(Clone)] +pub(crate) struct WatcherWrapper { + inner: Arc, +} + +impl WatcherWrapper { + pub fn new(path: &Path) -> Result { + let (inner, watcher_recv) = InnerWatcherWrapper::new(path).map_err(|err| match err { + notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()), + _ => { + panic!("Unknown error while starting watching directory {:?}", path); + } + })?; + let watcher_wrapper = WatcherWrapper { + inner: Arc::new(inner), + }; + let watcher_wrapper_clone = watcher_wrapper.clone(); + thread::Builder::new() + .name("meta-file-watch-thread".to_string()) + .spawn(move || { + loop { + match watcher_recv.recv().map(|evt| evt.path) { + Ok(Some(changed_path)) => { + // ... Actually subject to false positive. + // We might want to be more accurate than this at one point. + if let Some(filename) = changed_path.file_name() { + if filename == *META_FILEPATH { + watcher_wrapper_clone.inner.watcher_router.broadcast(); + } + } + } + Ok(None) => { + // not an event we are interested in. + } + Err(_e) => { + // the watch send channel was dropped + break; + } + } + } + }) + .expect("Failed to spawn thread to watch meta.json"); + Ok(watcher_wrapper) + } + + pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle { + self.inner.watcher_router.subscribe(watch_callback) } } @@ -131,31 +218,62 @@ impl MmapCache { /// On Windows the semantics are again different. #[derive(Clone)] pub struct MmapDirectory { + inner: Arc, +} + +struct MmapDirectoryInner { root_path: PathBuf, - mmap_cache: Arc>, - _temp_directory: Arc>, + mmap_cache: RwLock, + _temp_directory: Option, + watcher: RwLock, +} + +impl MmapDirectoryInner { + fn new( + root_path: PathBuf, + temp_directory: Option, + ) -> Result { + let watch_wrapper = WatcherWrapper::new(&root_path)?; + let mmap_directory_inner = MmapDirectoryInner { + root_path, + mmap_cache: Default::default(), + _temp_directory: temp_directory, + watcher: RwLock::new(watch_wrapper), + }; + Ok(mmap_directory_inner) + } + + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle { + let mut wlock = self.watcher.write().unwrap(); + wlock.watch(watch_callback) + } } impl fmt::Debug for MmapDirectory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "MmapDirectory({:?})", self.root_path) + write!(f, "MmapDirectory({:?})", self.inner.root_path) } } impl MmapDirectory { + fn new( + root_path: PathBuf, + temp_directory: Option, + ) -> Result { + let inner = MmapDirectoryInner::new(root_path, temp_directory)?; + Ok(MmapDirectory { + inner: Arc::new(inner), + }) + } + /// Creates a new MmapDirectory in a temporary directory. /// /// This is mostly useful to test the MmapDirectory itself. /// For your unit tests, prefer the RAMDirectory. - pub fn create_from_tempdir() -> io::Result { - let tempdir = TempDir::new("index")?; + pub fn create_from_tempdir() -> Result { + let tempdir = TempDir::new("index").map_err(OpenDirectoryError::IoError)?; let tempdir_path = PathBuf::from(tempdir.path()); - let directory = MmapDirectory { - root_path: tempdir_path, - mmap_cache: Arc::new(RwLock::new(MmapCache::default())), - _temp_directory: Arc::new(Some(tempdir)), - }; - Ok(directory) + MmapDirectory::new(tempdir_path, Some(tempdir)) } /// Opens a MmapDirectory in a directory. @@ -173,18 +291,14 @@ impl MmapDirectory { directory_path, ))) } else { - Ok(MmapDirectory { - root_path: PathBuf::from(directory_path), - mmap_cache: Arc::new(RwLock::new(MmapCache::default())), - _temp_directory: Arc::new(None), - }) + Ok(MmapDirectory::new(PathBuf::from(directory_path), None)?) } } /// Joins a relative_path to the directory `root_path` /// to create a proper complete `filepath`. fn resolve_path(&self, relative_path: &Path) -> PathBuf { - self.root_path.join(relative_path) + self.inner.root_path.join(relative_path) } /// Sync the root directory. @@ -209,7 +323,7 @@ impl MmapDirectory { .custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS); } - let fd = open_opts.open(&self.root_path)?; + let fd = open_opts.open(&self.inner.root_path)?; fd.sync_all()?; Ok(()) } @@ -219,9 +333,15 @@ impl MmapDirectory { /// /// The `MmapDirectory` embeds a `MmapDirectory` /// to avoid multiplying the `mmap` system calls. - pub fn get_cache_info(&mut self) -> CacheInfo { - self.mmap_cache + pub fn get_cache_info(&self) -> CacheInfo { + self.inner + .mmap_cache .write() + .expect("mmap cache lock is poisoned") + .remove_weak_ref(); + self.inner + .mmap_cache + .read() .expect("Mmap cache lock is poisoned.") .get_info() } @@ -274,7 +394,7 @@ impl Directory for MmapDirectory { debug!("Open Read {:?}", path); let full_path = self.resolve_path(path); - let mut mmap_cache = self.mmap_cache.write().map_err(|_| { + let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| { let msg = format!( "Failed to acquired write lock \ on mmap cache while reading {:?}", @@ -288,6 +408,30 @@ impl Directory for MmapDirectory { .unwrap_or_else(ReadOnlySource::empty)) } + /// Any entry associated to the path in the mmap will be + /// removed before the file is deleted. + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { + debug!("Deleting file {:?}", path); + let full_path = self.resolve_path(path); + match fs::remove_file(&full_path) { + Ok(_) => self + .sync_directory() + .map_err(|e| IOError::with_path(path.to_owned(), e).into()), + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + Err(DeleteError::FileDoesNotExist(path.to_owned())) + } else { + Err(IOError::with_path(path.to_owned(), e).into()) + } + } + } + } + + fn exists(&self, path: &Path) -> bool { + let full_path = self.resolve_path(path); + full_path.exists() + } + fn open_write(&mut self, path: &Path) -> Result { debug!("Open Write {:?}", path); let full_path = self.resolve_path(path); @@ -318,30 +462,6 @@ impl Directory for MmapDirectory { Ok(BufWriter::new(Box::new(writer))) } - /// Any entry associated to the path in the mmap will be - /// removed before the file is deleted. - fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { - debug!("Deleting file {:?}", path); - let full_path = self.resolve_path(path); - match fs::remove_file(&full_path) { - Ok(_) => self - .sync_directory() - .map_err(|e| IOError::with_path(path.to_owned(), e).into()), - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - Err(DeleteError::FileDoesNotExist(path.to_owned())) - } else { - Err(IOError::with_path(path.to_owned(), e).into()) - } - } - } - } - - fn exists(&self, path: &Path) -> bool { - let full_path = self.resolve_path(path); - full_path.exists() - } - fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { let full_path = self.resolve_path(path); let mut buffer = Vec::new(); @@ -388,6 +508,10 @@ impl Directory for MmapDirectory { _file: file, }))) } + + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle { + self.inner.watch(watch_callback) + } } #[cfg(test)] @@ -397,6 +521,13 @@ mod tests { // The following tests are specific to the MmapDirectory use super::*; + use schema::{Schema, SchemaBuilder, TEXT}; + use std::fs; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::thread; + use std::time::Duration; + use Index; + use ReloadPolicy; #[test] fn test_open_non_existant_path() { @@ -421,7 +552,7 @@ mod tests { #[test] fn test_cache() { - let content = "abc".as_bytes(); + let content = b"abc"; // here we test if the cache releases // mmaps correctly. @@ -456,26 +587,27 @@ mod tests { for path in paths.iter() { let _r = mmap_directory.open_read(path).unwrap(); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); assert_eq!(mmap_directory.get_cache_info().counters.miss, 10); assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); drop(keep); for path in paths.iter() { let _r = mmap_directory.open_read(path).unwrap(); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1); } assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); assert_eq!(mmap_directory.get_cache_info().counters.miss, 20); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); for path in &paths { mmap_directory.delete(path).unwrap(); } assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); assert_eq!(mmap_directory.get_cache_info().counters.miss, 20); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); for path in paths.iter() { assert!(mmap_directory.open_read(path).is_err()); } @@ -484,4 +616,56 @@ mod tests { assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); } + #[test] + fn test_watch_wrapper() { + let counter: Arc = Default::default(); + let counter_clone = counter.clone(); + let tmp_dir: TempDir = tempdir::TempDir::new("test_watch_wrapper").unwrap(); + let tmp_dirpath = tmp_dir.path().to_owned(); + let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap(); + let tmp_file = tmp_dirpath.join("coucou"); + let _handle = watch_wrapper.watch(Box::new(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + })); + assert_eq!(counter.load(Ordering::SeqCst), 0); + fs::write(&tmp_file, b"whateverwilldo").unwrap(); + thread::sleep(Duration::new(0, 1_000u32)); + } + + #[test] + fn test_mmap_released() { + let mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); + let mut schema_builder: SchemaBuilder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let schema = schema_builder.build(); + { + let index = Index::create(mmap_directory.clone(), schema).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + for _num_commits in 0..16 { + for _ in 0..10 { + index_writer.add_document(doc!(text_field=>"abc")); + } + index_writer.commit().unwrap(); + } + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); + for _ in 0..30 { + index_writer.add_document(doc!(text_field=>"abc")); + index_writer.commit().unwrap(); + reader.reload().unwrap(); + } + index_writer.wait_merging_threads().unwrap(); + reader.reload().unwrap(); + let num_segments = reader.searcher().segment_readers().len(); + assert_eq!(num_segments, 4); + assert_eq!( + num_segments * 7, + mmap_directory.get_cache_info().mmapped.len() + ); + } + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); + } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 6f7ef88b8..8d880b0f9 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -12,6 +12,7 @@ mod directory_lock; mod managed_directory; mod ram_directory; mod read_only_source; +mod watch_event_router; /// Errors specific to the directory module. pub mod error; @@ -21,6 +22,8 @@ pub use self::directory::{Directory, DirectoryClone}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; +pub(crate) use self::watch_event_router::WatchCallbackList; +pub use self::watch_event_router::{WatchCallback, WatchHandle}; use std::io::{BufWriter, Seek, Write}; #[cfg(feature = "mmap")] diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 9423affff..985117740 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,7 +1,8 @@ -use common::make_io_err; -use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError}; +use core::META_FILEPATH; +use directory::error::{DeleteError, OpenReadError, OpenWriteError}; +use directory::WatchCallbackList; use directory::WritePtr; -use directory::{Directory, ReadOnlySource}; +use directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; use std::collections::HashMap; use std::fmt; use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write}; @@ -21,13 +22,13 @@ use std::sync::{Arc, RwLock}; /// struct VecWriter { path: PathBuf, - shared_directory: InnerDirectory, + shared_directory: RAMDirectory, data: Cursor>, is_flushed: bool, } impl VecWriter { - fn new(path_buf: PathBuf, shared_directory: InnerDirectory) -> VecWriter { + fn new(path_buf: PathBuf, shared_directory: RAMDirectory) -> VecWriter { VecWriter { path: path_buf, data: Cursor::new(Vec::new()), @@ -63,74 +64,44 @@ impl Write for VecWriter { fn flush(&mut self) -> io::Result<()> { self.is_flushed = true; - self.shared_directory - .write(self.path.clone(), self.data.get_ref())?; + let mut fs = self.shared_directory.fs.write().unwrap(); + fs.write(self.path.clone(), self.data.get_ref()); Ok(()) } } -#[derive(Clone)] -struct InnerDirectory(Arc>>); +#[derive(Default)] +struct InnerDirectory { + fs: HashMap, + watch_router: WatchCallbackList, +} impl InnerDirectory { - fn new() -> InnerDirectory { - InnerDirectory(Arc::new(RwLock::new(HashMap::new()))) - } - - fn write(&self, path: PathBuf, data: &[u8]) -> io::Result { - let mut map = self.0.write().map_err(|_| { - make_io_err(format!( - "Failed to lock the directory, when trying to write {:?}", - path - )) - })?; - let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data))); - Ok(prev_value.is_some()) + fn write(&mut self, path: PathBuf, data: &[u8]) -> bool { + let data = ReadOnlySource::new(Vec::from(data)); + self.fs.insert(path, data).is_some() } fn open_read(&self, path: &Path) -> Result { - self.0 - .read() - .map_err(|_| { - let msg = format!( - "Failed to acquire read lock for the \ - directory when trying to read {:?}", - path - ); - let io_err = make_io_err(msg); - OpenReadError::IOError(IOError::with_path(path.to_owned(), io_err)) - }) - .and_then(|readable_map| { - readable_map - .get(path) - .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path))) - .map(|el| el.clone()) - }) + self.fs + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path))) + .map(|el| el.clone()) } - fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { - self.0 - .write() - .map_err(|_| { - let msg = format!( - "Failed to acquire write lock for the \ - directory when trying to delete {:?}", - path - ); - let io_err = make_io_err(msg); - DeleteError::IOError(IOError::with_path(path.to_owned(), io_err)) - }) - .and_then(|mut writable_map| match writable_map.remove(path) { - Some(_) => Ok(()), - None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))), - }) + fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> { + match self.fs.remove(path) { + Some(_) => Ok(()), + None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))), + } } fn exists(&self, path: &Path) -> bool { - self.0 - .read() - .expect("Failed to get read lock directory.") - .contains_key(path) + self.fs.contains_key(path) + } + + fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle { + self.watch_router.subscribe(watch_handle) } } @@ -145,33 +116,36 @@ impl fmt::Debug for RAMDirectory { /// It is mainly meant for unit testing. /// Writes are only made visible upon flushing. /// -#[derive(Clone)] +#[derive(Clone, Default)] pub struct RAMDirectory { - fs: InnerDirectory, + fs: Arc>, } impl RAMDirectory { /// Constructor pub fn create() -> RAMDirectory { - RAMDirectory { - fs: InnerDirectory::new(), - } + Self::default() } } impl Directory for RAMDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.fs.open_read(path) + self.fs.read().unwrap().open_read(path) + } + + fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { + self.fs.write().unwrap().delete(path) + } + + fn exists(&self, path: &Path) -> bool { + self.fs.read().unwrap().exists(path) } fn open_write(&mut self, path: &Path) -> Result { + let mut fs = self.fs.write().unwrap(); let path_buf = PathBuf::from(path); - let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone()); - - let exists = self - .fs - .write(path_buf.clone(), &Vec::new()) - .map_err(|err| IOError::with_path(path.to_owned(), err))?; + let vec_writer = VecWriter::new(path_buf.clone(), self.clone()); + let exists = fs.write(path_buf.clone(), &[]); // force the creation of the file to mimic the MMap directory. if exists { Err(OpenWriteError::FileAlreadyExists(path_buf)) @@ -180,17 +154,8 @@ impl Directory for RAMDirectory { } } - fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { - self.fs.delete(path) - } - - fn exists(&self, path: &Path) -> bool { - self.fs.exists(path) - } - fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { - let read = self.open_read(path)?; - Ok(read.as_slice().to_owned()) + Ok(self.open_read(path)?.as_slice().to_owned()) } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { @@ -199,10 +164,20 @@ impl Directory for RAMDirectory { msg.unwrap_or("Undefined".to_string()) ))); let path_buf = PathBuf::from(path); - let mut vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone()); - self.fs.write(path_buf, &Vec::new())?; + + // Reserve the path to prevent calls to .write() to succeed. + self.fs.write().unwrap().write(path_buf.clone(), &[]); + + let mut vec_writer = VecWriter::new(path_buf.clone(), self.clone()); vec_writer.write_all(data)?; vec_writer.flush()?; + if path == Path::new(&*META_FILEPATH) { + self.fs.write().unwrap().watch_router.broadcast(); + } Ok(()) } + + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle { + self.fs.write().unwrap().watch(watch_callback) + } } diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 11bedfeb5..13cb34ede 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -1,7 +1,13 @@ use super::*; use std::io::{Seek, SeekFrom, Write}; +use std::mem; use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; use std::time; +use std::time::Duration; lazy_static! { static ref TEST_PATH: &'static Path = Path::new("some_path_for_test"); @@ -30,19 +36,18 @@ fn ram_directory_panics_if_flush_forgotten() { fn test_simple(directory: &mut Directory) { { - { - let mut write_file = directory.open_write(*TEST_PATH).unwrap(); - assert!(directory.exists(*TEST_PATH)); - write_file.write_all(&[4]).unwrap(); - write_file.write_all(&[3]).unwrap(); - write_file.write_all(&[7, 3, 5]).unwrap(); - write_file.flush().unwrap(); - } + let mut write_file = directory.open_write(*TEST_PATH).unwrap(); + assert!(directory.exists(*TEST_PATH)); + write_file.write_all(&[4]).unwrap(); + write_file.write_all(&[3]).unwrap(); + write_file.write_all(&[7, 3, 5]).unwrap(); + write_file.flush().unwrap(); + } + { let read_file = directory.open_read(*TEST_PATH).unwrap(); let data: &[u8] = &*read_file; assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]); } - assert!(directory.delete(*TEST_PATH).is_ok()); assert!(!directory.exists(*TEST_PATH)); } @@ -121,6 +126,41 @@ fn test_directory(directory: &mut Directory) { test_directory_delete(directory); test_lock_non_blocking(directory); test_lock_blocking(directory); + test_watch(directory); +} + +fn test_watch(directory: &mut Directory) { + let counter: Arc = Default::default(); + let counter_clone = counter.clone(); + let watch_callback = Box::new(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + assert!(directory + .atomic_write(Path::new("meta.json"), b"random_test_data") + .is_ok()); + thread::sleep(Duration::new(0, 10_000)); + assert_eq!(0, counter.load(Ordering::SeqCst)); + + let watch_handle = directory.watch(watch_callback); + for i in 0..10 { + assert_eq!(i, counter.load(Ordering::SeqCst)); + assert!(directory + .atomic_write(Path::new("meta.json"), b"random_test_data_2") + .is_ok()); + for _ in 0..100 { + if counter.load(Ordering::SeqCst) > i { + break; + } + thread::sleep(Duration::from_millis(10)); + } + assert_eq!(i + 1, counter.load(Ordering::SeqCst)); + } + mem::drop(watch_handle); + assert!(directory + .atomic_write(Path::new("meta.json"), b"random_test_data") + .is_ok()); + thread::sleep(Duration::from_millis(200)); + assert_eq!(10, counter.load(Ordering::SeqCst)); } fn test_lock_non_blocking(directory: &mut Directory) { diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs new file mode 100644 index 000000000..820c73a11 --- /dev/null +++ b/src/directory/watch_event_router.rs @@ -0,0 +1,156 @@ +use std::sync::Arc; +use std::sync::RwLock; +use std::sync::Weak; + +/// Type alias for callbacks registered when watching files of a `Directory`. +pub type WatchCallback = Box () + Sync + Send>; + +/// Helper struct to implement the watch method in `Directory` implementations. +/// +/// It registers callbacks (See `.subscribe(...)`) and +/// calls them upon calls to `.broadcast(...)`. +#[derive(Default)] +pub struct WatchCallbackList { + router: RwLock>>, +} + +/// Controls how long a directory should watch for a file change. +/// +/// After all the clones of `WatchHandle` are dropped, the associated will not be called when a +/// file change is detected. +#[must_use = "This `WatchHandle` controls the lifetime of the watch and should therefore be used."] +#[derive(Clone)] +pub struct WatchHandle(Arc); + +impl WatchCallbackList { + /// Suscribes a new callback and returns a handle that controls the lifetime of the callback. + pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle { + let watch_callback_arc = Arc::new(watch_callback); + let watch_callback_weak = Arc::downgrade(&watch_callback_arc); + self.router.write().unwrap().push(watch_callback_weak); + WatchHandle(watch_callback_arc) + } + + fn list_callback(&self) -> Vec> { + let mut callbacks = vec![]; + let mut router_wlock = self.router.write().unwrap(); + let mut i = 0; + while i < router_wlock.len() { + if let Some(watch) = router_wlock[i].upgrade() { + callbacks.push(watch); + i += 1; + } else { + router_wlock.swap_remove(i); + } + } + callbacks + } + + /// Triggers all callbacks + pub fn broadcast(&self) { + let callbacks = self.list_callback(); + let spawn_res = std::thread::Builder::new() + .name("watch-callbacks".to_string()) + .spawn(move || { + for callback in callbacks { + callback(); + } + }); + if let Err(err) = spawn_res { + error!( + "Failed to spawn thread to call watch callbacks. Cause: {:?}", + err + ); + } + } +} + +#[cfg(test)] +mod tests { + use directory::WatchCallbackList; + use std::mem; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + const WAIT_TIME: u64 = 20; + + #[test] + fn test_watch_event_router_simple() { + let watch_event_router = WatchCallbackList::default(); + let counter: Arc = Default::default(); + let counter_clone = counter.clone(); + let inc_callback = Box::new(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + watch_event_router.broadcast(); + assert_eq!(0, counter.load(Ordering::SeqCst)); + let handle_a = watch_event_router.subscribe(inc_callback); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(0, counter.load(Ordering::SeqCst)); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(1, counter.load(Ordering::SeqCst)); + watch_event_router.broadcast(); + watch_event_router.broadcast(); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(4, counter.load(Ordering::SeqCst)); + mem::drop(handle_a); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(4, counter.load(Ordering::SeqCst)); + } + + #[test] + fn test_watch_event_router_multiple_callback_same_key() { + let watch_event_router = WatchCallbackList::default(); + let counter: Arc = Default::default(); + let inc_callback = |inc: usize| { + let counter_clone = counter.clone(); + Box::new(move || { + counter_clone.fetch_add(inc, Ordering::SeqCst); + }) + }; + let handle_a = watch_event_router.subscribe(inc_callback(1)); + let handle_a2 = watch_event_router.subscribe(inc_callback(10)); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(0, counter.load(Ordering::SeqCst)); + watch_event_router.broadcast(); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(22, counter.load(Ordering::SeqCst)); + mem::drop(handle_a); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(32, counter.load(Ordering::SeqCst)); + mem::drop(handle_a2); + watch_event_router.broadcast(); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(32, counter.load(Ordering::SeqCst)); + } + + #[test] + fn test_watch_event_router_multiple_callback_different_key() { + let watch_event_router = WatchCallbackList::default(); + let counter: Arc = Default::default(); + let counter_clone = counter.clone(); + let inc_callback = Box::new(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + let handle_a = watch_event_router.subscribe(inc_callback); + assert_eq!(0, counter.load(Ordering::SeqCst)); + watch_event_router.broadcast(); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(2, counter.load(Ordering::SeqCst)); + thread::sleep(Duration::from_millis(WAIT_TIME)); + mem::drop(handle_a); + watch_event_router.broadcast(); + thread::sleep(Duration::from_millis(WAIT_TIME)); + assert_eq!(2, counter.load(Ordering::SeqCst)); + } + +} diff --git a/src/error.rs b/src/error.rs index 7c8bb25e7..5da069105 100644 --- a/src/error.rs +++ b/src/error.rs @@ -162,6 +162,7 @@ impl From for TantivyError { OpenDirectoryError::NotADirectory(directory_path) => { TantivyError::InvalidArgument(format!("{:?} is not a directory", directory_path)) } + OpenDirectoryError::IoError(err) => TantivyError::IOError(IOError::from(err)), } } } diff --git a/src/fastfield/bytes/mod.rs b/src/fastfield/bytes/mod.rs index 1a551ecc0..b3e73a590 100644 --- a/src/fastfield/bytes/mod.rs +++ b/src/fastfield/bytes/mod.rs @@ -22,9 +22,7 @@ mod tests { index_writer.add_document(doc!(field=>vec![1u8, 3, 5, 7, 9])); index_writer.add_document(doc!(field=>vec![0u8; 1000])); assert!(index_writer.commit().is_ok()); - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let reader = searcher.segment_reader(0); let bytes_reader = reader.bytes_fast_field_reader(field).unwrap(); diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index b8e288f52..3e2a30eb2 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -9,14 +9,14 @@ mod tests { extern crate time; - use query::QueryParser; + use self::time::Duration; use collector::TopDocs; + use query::QueryParser; use schema::Cardinality; use schema::Facet; use schema::IntOptions; use schema::Schema; use Index; - use self::time::Duration; #[test] fn test_multivalued_u64() { @@ -34,11 +34,12 @@ mod tests { index_writer.add_document(doc!(field=>5u64, field=>20u64,field=>1u64)); assert!(index_writer.commit().is_ok()); - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let reader = searcher.segment_reader(0); + let searcher = index.reader().unwrap().searcher(); + let segment_reader = searcher.segment_reader(0); let mut vals = Vec::new(); - let multi_value_reader = reader.multi_fast_field_reader::(field).unwrap(); + let multi_value_reader = segment_reader + .multi_fast_field_reader::(field) + .unwrap(); { multi_value_reader.get_vals(2, &mut vals); assert_eq!(&vals, &[4u64]); @@ -63,92 +64,121 @@ mod tests { .set_indexed() .set_stored(), ); - let time_i = schema_builder.add_i64_field( - "time_stamp_i", - IntOptions::default() - .set_stored(), - ); + let time_i = + schema_builder.add_i64_field("time_stamp_i", IntOptions::default().set_stored()); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let first_time_stamp = chrono::Utc::now(); - index_writer.add_document(doc!(date_field=>first_time_stamp, date_field=>first_time_stamp, time_i=>1i64)); + index_writer.add_document( + doc!(date_field=>first_time_stamp, date_field=>first_time_stamp, time_i=>1i64), + ); index_writer.add_document(doc!(time_i=>0i64)); // add one second - index_writer.add_document(doc!(date_field=>first_time_stamp + Duration::seconds(1), time_i=>2i64)); + index_writer + .add_document(doc!(date_field=>first_time_stamp + Duration::seconds(1), time_i=>2i64)); // add another second let two_secs_ahead = first_time_stamp + Duration::seconds(2); index_writer.add_document(doc!(date_field=>two_secs_ahead, date_field=>two_secs_ahead,date_field=>two_secs_ahead, time_i=>3i64)); assert!(index_writer.commit().is_ok()); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let reader = searcher.segment_reader(0); assert_eq!(reader.num_docs(), 4); { let parser = QueryParser::for_index(&index, vec![date_field]); - let query = parser.parse_query(&format!("\"{}\"", first_time_stamp.to_rfc3339()).to_string()) + let query = parser + .parse_query(&format!("\"{}\"", first_time_stamp.to_rfc3339()).to_string()) .expect("could not parse query"); - let results = searcher.search(&query, &TopDocs::with_limit(5)) + let results = searcher + .search(&query, &TopDocs::with_limit(5)) .expect("could not query index"); assert_eq!(results.len(), 1); for (_score, doc_address) in results { let retrieved_doc = searcher.doc(doc_address).expect("cannot fetch doc"); - assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), first_time_stamp.timestamp()); - assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), 1i64); + assert_eq!( + retrieved_doc + .get_first(date_field) + .expect("cannot find value") + .date_value() + .timestamp(), + first_time_stamp.timestamp() + ); + assert_eq!( + retrieved_doc + .get_first(time_i) + .expect("cannot find value") + .i64_value(), + 1i64 + ); } } { let parser = QueryParser::for_index(&index, vec![date_field]); - let query = parser.parse_query(&format!("\"{}\"", two_secs_ahead.to_rfc3339()).to_string()) + let query = parser + .parse_query(&format!("\"{}\"", two_secs_ahead.to_rfc3339()).to_string()) .expect("could not parse query"); - let results = searcher.search(&query, &TopDocs::with_limit(5)) + let results = searcher + .search(&query, &TopDocs::with_limit(5)) .expect("could not query index"); assert_eq!(results.len(), 1); for (_score, doc_address) in results { let retrieved_doc = searcher.doc(doc_address).expect("cannot fetch doc"); - assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), two_secs_ahead.timestamp()); - assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), 3i64); + assert_eq!( + retrieved_doc + .get_first(date_field) + .expect("cannot find value") + .date_value() + .timestamp(), + two_secs_ahead.timestamp() + ); + assert_eq!( + retrieved_doc + .get_first(time_i) + .expect("cannot find value") + .i64_value(), + 3i64 + ); } } - // TODO: support Date range queries -// { -// let parser = QueryParser::for_index(&index, vec![date_field]); -// let range_q = format!("\"{}\"..\"{}\"", -// (first_time_stamp + Duration::seconds(1)).to_rfc3339(), -// (first_time_stamp + Duration::seconds(3)).to_rfc3339() -// ); -// let query = parser.parse_query(&range_q) -// .expect("could not parse query"); -// let results = searcher.search(&query, &TopDocs::with_limit(5)) -// .expect("could not query index"); -// -// -// assert_eq!(results.len(), 2); -// for (i, doc_pair) in results.iter().enumerate() { -// let retrieved_doc = searcher.doc(doc_pair.1).expect("cannot fetch doc"); -// let offset_sec = match i { -// 0 => 1, -// 1 => 3, -// _ => panic!("should not have more than 2 docs") -// }; -// let time_i_val = match i { -// 0 => 2, -// 1 => 3, -// _ => panic!("should not have more than 2 docs") -// }; -// assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), -// (first_time_stamp + Duration::seconds(offset_sec)).timestamp()); -// assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), time_i_val); -// } -// } + // { + // let parser = QueryParser::for_index(&index, vec![date_field]); + // let range_q = format!("\"{}\"..\"{}\"", + // (first_time_stamp + Duration::seconds(1)).to_rfc3339(), + // (first_time_stamp + Duration::seconds(3)).to_rfc3339() + // ); + // let query = parser.parse_query(&range_q) + // .expect("could not parse query"); + // let results = searcher.search(&query, &TopDocs::with_limit(5)) + // .expect("could not query index"); + // + // + // assert_eq!(results.len(), 2); + // for (i, doc_pair) in results.iter().enumerate() { + // let retrieved_doc = searcher.doc(doc_pair.1).expect("cannot fetch doc"); + // let offset_sec = match i { + // 0 => 1, + // 1 => 3, + // _ => panic!("should not have more than 2 docs") + // }; + // let time_i_val = match i { + // 0 => 2, + // 1 => 3, + // _ => panic!("should not have more than 2 docs") + // }; + // assert_eq!(retrieved_doc.get_first(date_field).expect("cannot find value").date_value().timestamp(), + // (first_time_stamp + Duration::seconds(offset_sec)).timestamp()); + // assert_eq!(retrieved_doc.get_first(time_i).expect("cannot find value").i64_value(), time_i_val); + // } + // } } #[test] @@ -167,8 +197,7 @@ mod tests { index_writer.add_document(doc!(field=> -5i64, field => -20i64, field=>1i64)); assert!(index_writer.commit().is_ok()); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let reader = searcher.segment_reader(0); let mut vals = Vec::new(); let multi_value_reader = reader.multi_fast_field_reader::(field).unwrap(); diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index d7df15a37..3456de525 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -75,8 +75,7 @@ mod tests { index_writer.add_document(doc); } index_writer.commit().expect("Commit failed"); - index.load_searchers().expect("Reloading searchers"); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); let mut facet_reader = segment_reader.facet_reader(facet_field).unwrap(); diff --git a/src/functional_test.rs b/src/functional_test.rs index 493f73c31..ff36369ea 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -22,6 +22,7 @@ fn test_indexing() { let schema = schema_builder.build(); let index = Index::create_from_tempdir(schema).unwrap(); + let reader = index.reader().unwrap(); let mut rng = thread_rng(); @@ -36,8 +37,8 @@ fn test_indexing() { index_writer.commit().expect("Commit failed"); committed_docs.extend(&uncommitted_docs); uncommitted_docs.clear(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); // check that everything is correct. check_index_content(&searcher, &committed_docs); } else { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 44c9010b6..94961f5fc 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -44,8 +44,8 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; // reaches `PIPELINE_MAX_SIZE_IN_DOCS` const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -type DocumentSender = channel::Sender>; -type DocumentReceiver = channel::Receiver>; +type OperationSender = channel::Sender>; +type OperationReceiver = channel::Receiver>; /// Split the thread memory budget into /// - the heap size @@ -85,8 +85,8 @@ pub struct IndexWriter { workers_join_handle: Vec>>, - document_receiver: DocumentReceiver, - document_sender: DocumentSender, + operation_receiver: OperationReceiver, + operation_sender: OperationSender, segment_updater: SegmentUpdater, @@ -133,7 +133,7 @@ pub fn open_index_writer( let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX); return Err(TantivyError::InvalidArgument(err_msg)); } - let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = + let (document_sender, document_receiver): (OperationSender, OperationReceiver) = channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS); let delete_queue = DeleteQueue::new(); @@ -151,8 +151,8 @@ pub fn open_index_writer( heap_size_in_bytes_per_thread, index: index.clone(), - document_receiver, - document_sender, + operation_receiver: document_receiver, + operation_sender: document_sender, segment_updater, @@ -335,7 +335,7 @@ impl IndexWriter { pub fn wait_merging_threads(mut self) -> Result<()> { // this will stop the indexing thread, // dropping the last reference to the segment_updater. - drop(self.document_sender); + drop(self.operation_sender); let former_workers_handles = mem::replace(&mut self.workers_join_handle, vec![]); for join_handle in former_workers_handles { @@ -384,7 +384,7 @@ impl IndexWriter { /// The thread consumes documents from the pipeline. /// fn add_indexing_worker(&mut self) -> Result<()> { - let document_receiver_clone = self.document_receiver.clone(); + let document_receiver_clone = self.operation_receiver.clone(); let mut segment_updater = self.segment_updater.clone(); let generation = self.generation; @@ -479,11 +479,11 @@ impl IndexWriter { /// when no documents are remaining. /// /// Returns the former segment_ready channel. - fn recreate_document_channel(&mut self) -> DocumentReceiver { - let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = + fn recreate_document_channel(&mut self) -> OperationReceiver { + let (document_sender, document_receiver): (OperationSender, OperationReceiver) = channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS); - mem::replace(&mut self.document_sender, document_sender); - mem::replace(&mut self.document_receiver, document_receiver) + mem::replace(&mut self.operation_sender, document_sender); + mem::replace(&mut self.operation_receiver, document_receiver) } /// Rollback to the last commit @@ -501,7 +501,7 @@ impl IndexWriter { // segment updates will be ignored. self.segment_updater.kill(); - let document_receiver = self.document_receiver.clone(); + let document_receiver = self.operation_receiver.clone(); // take the directory lock to create a new index_writer. let directory_lock = self @@ -648,7 +648,7 @@ impl IndexWriter { pub fn add_document(&mut self, document: Document) -> u64 { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; - let send_result = self.document_sender.send(vec![add_operation]); + let send_result = self.operation_sender.send(vec![add_operation]); if let Err(e) = send_result { panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); } @@ -709,7 +709,7 @@ impl IndexWriter { } } } - let send_result = self.document_sender.send(adds); + let send_result = self.operation_sender.send(adds); if let Err(e) = send_result { panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); }; @@ -728,8 +728,9 @@ mod tests { use error::*; use indexer::NoMergePolicy; use query::TermQuery; - use schema::{self, Document, IndexRecordOption}; + use schema::{self, IndexRecordOption}; use Index; + use ReloadPolicy; use Term; #[test] @@ -757,6 +758,11 @@ mod tests { let mut schema_builder = schema::Schema::builder(); let text_field = schema_builder.add_text_field("text", schema::TEXT); let index = Index::create_in_ram(schema_builder.build()); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let a_term = Term::from_field_text(text_field, "a"); let b_term = Term::from_field_text(text_field, "b"); @@ -769,7 +775,7 @@ mod tests { index_writer.run(operations); index_writer.commit().expect("failed to commit"); - index.load_searchers().expect("failed to load searchers"); + reader.reload().expect("failed to load searchers"); let a_term = Term::from_field_text(text_field, "a"); let b_term = Term::from_field_text(text_field, "b"); @@ -777,7 +783,7 @@ mod tests { let a_query = TermQuery::new(a_term, IndexRecordOption::Basic); let b_query = TermQuery::new(b_term, IndexRecordOption::Basic); - let searcher = index.searcher(); + let searcher = reader.searcher(); let a_docs = searcher .search(&a_query, &TopDocs::with_limit(1)) @@ -864,9 +870,13 @@ mod tests { let mut schema_builder = schema::Schema::builder(); let text_field = schema_builder.add_text_field("text", schema::TEXT); let index = Index::create_in_ram(schema_builder.build()); - + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); let num_docs_containing = |s: &str| { - let searcher = index.searcher(); + let searcher = reader.searcher(); let term = Term::from_field_text(text_field, s); searcher.doc_freq(&term) }; @@ -876,7 +886,6 @@ mod tests { let mut index_writer = index.writer(3_000_000).unwrap(); index_writer.add_document(doc!(text_field=>"a")); index_writer.rollback().unwrap(); - assert_eq!(index_writer.commit_opstamp(), 0u64); assert_eq!(num_docs_containing("a"), 0); { @@ -884,13 +893,13 @@ mod tests { index_writer.add_document(doc!(text_field=>"c")); } assert!(index_writer.commit().is_ok()); - index.load_searchers().unwrap(); + reader.reload().unwrap(); assert_eq!(num_docs_containing("a"), 0); assert_eq!(num_docs_containing("b"), 1); assert_eq!(num_docs_containing("c"), 1); } - index.load_searchers().unwrap(); - index.searcher(); + reader.reload().unwrap(); + reader.searcher(); } #[test] @@ -898,32 +907,33 @@ mod tests { let mut schema_builder = schema::Schema::builder(); let text_field = schema_builder.add_text_field("text", schema::TEXT); let index = Index::create_in_ram(schema_builder.build()); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); let num_docs_containing = |s: &str| { - let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) + reader.searcher().doc_freq(&term_a) }; { // writing the segment let mut index_writer = index.writer(12_000_000).unwrap(); // create 8 segments with 100 tiny docs for _doc in 0..100 { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc); + index_writer.add_document(doc!(text_field=>"a")); } index_writer.commit().expect("commit failed"); for _doc in 0..100 { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc); + index_writer.add_document(doc!(text_field=>"a")); } - // this should create 8 segments and trigger a merge. + // this should create 8 segments and trigger a merge. index_writer.commit().expect("commit failed"); index_writer .wait_merging_threads() .expect("waiting merging thread failed"); - index.load_searchers().unwrap(); + + reader.reload().unwrap(); assert_eq!(num_docs_containing("a"), 200); assert!(index.searchable_segments().unwrap().len() < 8); @@ -990,11 +1000,15 @@ mod tests { } index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); let num_docs_containing = |s: &str| { - let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) + index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap() + .searcher() + .doc_freq(&term_a) }; assert_eq!(num_docs_containing("a"), 0); assert_eq!(num_docs_containing("b"), 100); @@ -1026,11 +1040,9 @@ mod tests { index_writer.add_document(doc!(text_field => "b")); } assert!(index_writer.commit().is_err()); - index.load_searchers().unwrap(); let num_docs_containing = |s: &str| { - let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) + index.reader().unwrap().searcher().doc_freq(&term_a) }; assert_eq!(num_docs_containing("a"), 100); assert_eq!(num_docs_containing("b"), 0); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 37c07b7a6..97048546b 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -194,17 +194,17 @@ impl IndexMerger { fast_field_serializer, )?; } - FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => { - match options.get_fastfield_cardinality() { - Some(Cardinality::SingleValue) => { - self.write_single_fast_field(field, fast_field_serializer)?; - } - Some(Cardinality::MultiValues) => { - self.write_multi_fast_field(field, fast_field_serializer)?; - } - None => {} + FieldType::U64(ref options) + | FieldType::I64(ref options) + | FieldType::Date(ref options) => match options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + self.write_single_fast_field(field, fast_field_serializer)?; } - } + Some(Cardinality::MultiValues) => { + self.write_multi_fast_field(field, fast_field_serializer)?; + } + None => {} + }, FieldType::Str(_) => { // We don't handle str fast field for the moment // They can be implemented using what is done @@ -676,8 +676,8 @@ mod tests { let score_field = schema_builder.add_u64_field("score", score_fieldtype); let bytes_score_field = schema_builder.add_bytes_field("score_bytes"); let index = Index::create_in_ram(schema_builder.build()); + let reader = index.reader().unwrap(); let curr_time = chrono::Utc::now(); - let add_score_bytes = |doc: &mut Document, score: u32| { let mut bytes = Vec::new(); bytes @@ -748,8 +748,8 @@ mod tests { index_writer.wait_merging_threads().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); let get_doc_ids = |terms: Vec| { let query = BooleanQuery::new_multiterms_query(terms); let top_docs = searcher.search(&query, &TestCollector).unwrap(); @@ -780,10 +780,7 @@ mod tests { ); assert_eq!( get_doc_ids(vec![Term::from_field_date(date_field, &curr_time)]), - vec![ - DocAddress(0, 0), - DocAddress(0, 3) - ] + vec![DocAddress(0, 0), DocAddress(0, 3)] ); } { @@ -848,7 +845,7 @@ mod tests { let bytes_score_field = schema_builder.add_bytes_field("score_bytes"); let index = Index::create_in_ram(schema_builder.build()); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - + let reader = index.reader().unwrap(); let search_term = |searcher: &Searcher, term: Term| { let collector = FastFieldTestCollector::for_field(score_field); let bytes_collector = BytesFastFieldTestCollector::for_field(bytes_score_field); @@ -885,8 +882,8 @@ mod tests { bytes_score_field => vec![0u8, 0, 0, 3], )); index_writer.commit().expect("committed"); - index.load_searchers().unwrap(); - let ref searcher = *index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 2); assert_eq!(searcher.segment_readers()[0].num_docs(), 2); assert_eq!(searcher.segment_readers()[0].max_doc(), 3); @@ -932,8 +929,8 @@ mod tests { bytes_score_field => vec![0u8, 0, 27, 88], )); index_writer.commit().expect("committed"); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 2); assert_eq!(searcher.num_docs(), 3); @@ -994,8 +991,8 @@ mod tests { .expect("Failed to initiate merge") .wait() .expect("Merging failed"); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); assert_eq!(searcher.num_docs(), 3); assert_eq!(searcher.segment_readers()[0].num_docs(), 3); @@ -1040,8 +1037,8 @@ mod tests { index_writer.delete_term(Term::from_field_text(text_field, "c")); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); assert_eq!(searcher.num_docs(), 2); assert_eq!(searcher.segment_readers()[0].num_docs(), 2); @@ -1091,9 +1088,9 @@ mod tests { .expect("Failed to initiate merge") .wait() .expect("Merging failed"); - index.load_searchers().unwrap(); + reader.reload().unwrap(); - let ref searcher = *index.searcher(); + let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); assert_eq!(searcher.num_docs(), 2); assert_eq!(searcher.segment_readers()[0].num_docs(), 2); @@ -1141,9 +1138,9 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - index.load_searchers().unwrap(); + reader.reload().unwrap(); - let ref searcher = *index.searcher(); + let searcher = reader.searcher(); assert!(segment_ids.is_empty()); assert!(searcher.segment_readers().is_empty()); assert_eq!(searcher.num_docs(), 0); @@ -1155,6 +1152,7 @@ mod tests { let mut schema_builder = schema::Schema::builder(); let facet_field = schema_builder.add_facet_field("facet"); let index = Index::create_in_ram(schema_builder.build()); + let reader = index.reader().unwrap(); { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| { @@ -1184,9 +1182,9 @@ mod tests { index_writer.commit().expect("committed"); } - index.load_searchers().unwrap(); + reader.reload().unwrap(); let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| { - let searcher = index.searcher(); + let searcher = reader.searcher(); let mut facet_collector = FacetCollector::for_field(facet_field); facet_collector.add_facet(Facet::from("/top")); let (count, facet_counts) = searcher @@ -1228,7 +1226,7 @@ mod tests { .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); - index.load_searchers().unwrap(); + reader.reload().unwrap(); test_searcher( 11, &[ @@ -1249,7 +1247,7 @@ mod tests { let facet_term = Term::from_facet(facet_field, &facet); index_writer.delete_term(facet_term); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); + reader.reload().unwrap(); test_searcher( 9, &[ @@ -1274,8 +1272,8 @@ mod tests { index_writer.commit().expect("commit failed"); index_writer.add_document(doc!(int_field => 1u64)); index_writer.commit().expect("commit failed"); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 2); index_writer.delete_term(Term::from_field_u64(int_field, 1)); let segment_ids = index @@ -1286,10 +1284,10 @@ mod tests { .expect("Failed to initiate merge") .wait() .expect("Merging failed"); - index.load_searchers().unwrap(); + reader.reload().unwrap(); // commit has not been called yet. The document should still be // there. - assert_eq!(index.searcher().num_docs(), 2); + assert_eq!(reader.searcher().num_docs(), 2); } #[test] @@ -1300,7 +1298,7 @@ mod tests { .set_indexed(); let int_field = schema_builder.add_u64_field("intvals", int_options); let index = Index::create_in_ram(schema_builder.build()); - + let reader = index.reader().unwrap(); { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let mut doc = Document::default(); @@ -1321,8 +1319,8 @@ mod tests { .expect("Merging failed"); // assert delete has not been committed - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().expect("failed to load searcher 1"); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 2); index_writer.commit().unwrap(); @@ -1330,13 +1328,13 @@ mod tests { index_writer.wait_merging_threads().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 0); } #[test] - fn test_merge_multivalued_int_fields() { + fn test_merge_multivalued_int_fields_simple() { let mut schema_builder = schema::Schema::builder(); let int_options = IntOptions::default() .set_fast(Cardinality::MultiValues) @@ -1353,7 +1351,6 @@ mod tests { } index_writer.add_document(doc); }; - index_doc(&mut index_writer, &[1, 2]); index_doc(&mut index_writer, &[1, 2, 3]); index_doc(&mut index_writer, &[4, 5]); @@ -1362,19 +1359,14 @@ mod tests { index_doc(&mut index_writer, &[3]); index_doc(&mut index_writer, &[17]); index_writer.commit().expect("committed"); - index_doc(&mut index_writer, &[20]); index_writer.commit().expect("committed"); - index_doc(&mut index_writer, &[28, 27]); index_doc(&mut index_writer, &[1_000]); - index_writer.commit().expect("committed"); } - index.load_searchers().unwrap(); - - let searcher = index.searcher(); - + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let mut vals: Vec = Vec::new(); { @@ -1440,13 +1432,14 @@ mod tests { .expect("Failed to initiate merge") .wait() .expect("Merging failed"); - index_writer.wait_merging_threads().unwrap(); + index_writer + .wait_merging_threads() + .expect("Wait for merging threads"); } - - index.load_searchers().unwrap(); + reader.reload().expect("Load searcher"); { - let searcher = index.searcher(); + let searcher = reader.searcher(); println!( "{:?}", searcher diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 2580bedda..3dc373795 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -565,9 +565,8 @@ mod tests { index_writer.delete_term(term); assert!(index_writer.commit().is_ok()); } - - index.load_searchers().unwrap(); - assert_eq!(index.searcher().num_docs(), 302); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().num_docs(), 302); { index_writer @@ -575,9 +574,9 @@ mod tests { .expect("waiting for merging threads"); } - index.load_searchers().unwrap(); - assert_eq!(index.searcher().segment_readers().len(), 1); - assert_eq!(index.searcher().num_docs(), 302); + reader.reload().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); + assert_eq!(reader.searcher().num_docs(), 302); } #[test] @@ -636,18 +635,18 @@ mod tests { .expect("waiting for merging threads"); } - index.load_searchers().unwrap(); - assert_eq!(index.searcher().num_docs(), 0); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); let seg_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); assert!(seg_ids.is_empty()); - index.load_searchers().unwrap(); - assert_eq!(index.searcher().num_docs(), 0); + reader.reload().unwrap(); + assert_eq!(reader.searcher().num_docs(), 0); // empty segments should be erased assert!(index.searchable_segment_metas().unwrap().is_empty()); - assert!(index.searcher().segment_readers().is_empty()); + assert!(reader.searcher().segment_readers().is_empty()); } } diff --git a/src/lib.rs b/src/lib.rs index 6beb6f32d..70c9e78fd 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,9 +75,9 @@ //! //! // # Searching //! -//! index.load_searchers()?; +//! let reader = index.reader()?; //! -//! let searcher = index.searcher(); +//! let searcher = reader.searcher(); //! //! let query_parser = QueryParser::for_index(&index, vec![title, body]); //! @@ -186,8 +186,8 @@ pub use error::TantivyError; pub use error::TantivyError as Error; extern crate census; -extern crate owned_read; pub extern crate chrono; +extern crate owned_read; /// Tantivy result. pub type Result = std::result::Result; @@ -215,6 +215,9 @@ pub mod space_usage; pub mod store; pub mod termdict; +mod reader; + +pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy}; mod snippet; pub use self::snippet::{Snippet, SnippetGenerator}; @@ -303,6 +306,7 @@ mod tests { use Index; use IndexWriter; use Postings; + use ReloadPolicy; pub fn assert_nearly_equals(expected: f32, val: f32) { assert!( @@ -391,8 +395,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let term_a = Term::from_field_text(text_field, "a"); assert_eq!(searcher.doc_freq(&term_a), 3); let term_b = Term::from_field_text(text_field, "b"); @@ -419,8 +423,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let index_reader = index.reader().unwrap(); + let searcher = index_reader.searcher(); let reader = searcher.segment_reader(0); { let fieldnorm_reader = reader.get_fieldnorms_reader(text_field); @@ -455,8 +459,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let segment_reader: &SegmentReader = searcher.segment_reader(0); let fieldnorms_reader = segment_reader.get_fieldnorms_reader(text_field); assert_eq!(fieldnorms_reader.fieldnorm(0), 3); @@ -484,6 +488,11 @@ mod tests { let term_c = Term::from_field_text(text_field, "c"); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); { // writing the segment let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); @@ -505,10 +514,10 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let reader = searcher.segment_reader(0); - let inverted_index = reader.inverted_index(text_field); + reader.reload().unwrap(); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0); + let inverted_index = segment_reader.inverted_index(text_field); assert!(inverted_index .read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions) .is_none()); @@ -516,19 +525,19 @@ mod tests { let mut postings = inverted_index .read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 5); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, segment_reader)); } { let mut postings = inverted_index .read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 3); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 4); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, segment_reader)); } } { @@ -541,10 +550,10 @@ mod tests { index_writer.rollback().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let reader = searcher.segment_reader(0); - let inverted_index = reader.inverted_index(term_abcd.field()); + reader.reload().unwrap(); + let searcher = reader.searcher(); + let seg_reader = searcher.segment_reader(0); + let inverted_index = seg_reader.inverted_index(term_abcd.field()); assert!(inverted_index .read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions) @@ -553,19 +562,19 @@ mod tests { let mut postings = inverted_index .read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, seg_reader)); assert_eq!(postings.doc(), 5); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, seg_reader)); } { let mut postings = inverted_index .read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, seg_reader)); assert_eq!(postings.doc(), 3); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, seg_reader)); assert_eq!(postings.doc(), 4); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, seg_reader)); } } { @@ -578,10 +587,10 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); - let reader = searcher.segment_reader(0); - let inverted_index = reader.inverted_index(term_abcd.field()); + reader.reload().unwrap(); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0); + let inverted_index = segment_reader.inverted_index(term_abcd.field()); assert!(inverted_index .read_postings(&term_abcd, IndexRecordOption::WithFreqsAndPositions) .is_none()); @@ -589,25 +598,25 @@ mod tests { let mut postings = inverted_index .read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, segment_reader)); } { let mut postings = inverted_index .read_postings(&term_b, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 3); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 4); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, segment_reader)); } { let mut postings = inverted_index .read_postings(&term_c, IndexRecordOption::WithFreqsAndPositions) .unwrap(); - assert!(advance_undeleted(&mut postings, reader)); + assert!(advance_undeleted(&mut postings, segment_reader)); assert_eq!(postings.doc(), 4); - assert!(!advance_undeleted(&mut postings, reader)); + assert!(!advance_undeleted(&mut postings, segment_reader)); } } } @@ -622,8 +631,8 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); index_writer.add_document(doc!(field=>1u64)); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let term = Term::from_field_u64(field, 1u64); let mut postings = searcher .segment_reader(0) @@ -646,8 +655,8 @@ mod tests { let negative_val = -1i64; index_writer.add_document(doc!(value_field => negative_val)); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let term = Term::from_field_i64(value_field, negative_val); let mut postings = searcher .segment_reader(0) @@ -669,8 +678,8 @@ mod tests { let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap(); index_writer.add_document(doc!(text_field=>"a")); assert!(index_writer.commit().is_ok()); - assert!(index.load_searchers().is_ok()); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let segment_reader = searcher.segment_reader(0); segment_reader.inverted_index(absent_field); //< should not panic } @@ -681,6 +690,11 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); // writing the segment let mut index_writer = index.writer_with_num_threads(2, 6_000_000).unwrap(); @@ -706,8 +720,8 @@ mod tests { remove_document(&mut index_writer, "38"); remove_document(&mut index_writer, "34"); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 6); } @@ -727,8 +741,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let index_reader = index.reader().unwrap(); + let searcher = index_reader.searcher(); let reader = searcher.segment_reader(0); let inverted_index = reader.inverted_index(text_field); let term_abcd = Term::from_field_text(text_field, "abcd"); @@ -752,7 +766,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); - + let reader = index.reader().unwrap(); { // writing the segment let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); @@ -762,8 +776,8 @@ mod tests { index_writer.commit().unwrap(); } { - index.load_searchers().unwrap(); - let searcher = index.searcher(); + reader.reload().unwrap(); + let searcher = reader.searcher(); let get_doc_ids = |terms: Vec| { let query = BooleanQuery::new_multiterms_query(terms); let topdocs = searcher.search(&query, &TestCollector).unwrap(); @@ -805,25 +819,22 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); - + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .unwrap(); + assert_eq!(reader.searcher().num_docs(), 0u64); { // writing the segment let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - { - let doc = doc!(text_field=>"af b"); - index_writer.add_document(doc); - } - { - let doc = doc!(text_field=>"a b c"); - index_writer.add_document(doc); - } - { - let doc = doc!(text_field=>"a b c d"); - index_writer.add_document(doc); - } + index_writer.add_document(doc!(text_field=>"af b")); + index_writer.add_document(doc!(text_field=>"a b c")); + index_writer.add_document(doc!(text_field=>"a b c d")); index_writer.commit().unwrap(); } - index.searcher(); + reader.reload().unwrap(); + assert_eq!(reader.searcher().num_docs(), 3u64); } #[test] @@ -860,9 +871,8 @@ mod tests { index_writer.add_document(document); index_writer.commit().unwrap(); } - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let segment_reader: &SegmentReader = searcher.segment_reader(0); { let fast_field_reader_res = segment_reader.fast_field_reader::(text_field); diff --git a/src/postings/mod.rs b/src/postings/mod.rs index e134e7a9e..e571e26df 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -100,9 +100,8 @@ pub mod tests { } index_writer.add_document(doc!(title => r#"abc be be be be abc"#)); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let inverted_index = searcher.segment_reader(0u32).inverted_index(title); let term = Term::from_field_text(title, "abc"); @@ -292,9 +291,8 @@ pub mod tests { } assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); let term_a = Term::from_field_text(text_field, "a"); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); let mut postings = segment_reader .inverted_index(text_field) @@ -331,10 +329,9 @@ pub mod tests { } assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); index }; - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); // check that the basic usage works @@ -402,8 +399,7 @@ pub mod tests { index_writer.delete_term(term_0); assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); // make sure seeking still works @@ -450,12 +446,9 @@ pub mod tests { { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); index_writer.delete_term(term_1); - assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); - - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); // finally, check that it's empty { @@ -511,7 +504,6 @@ pub mod tests { } assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); index }; } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 97de4cd10..61bcc3537 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -33,9 +33,10 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box { } }) .unwrap_or_else(|| SpecializedPostingsWriter::::new_boxed()), - FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) | FieldType::HierarchicalFacet => { - SpecializedPostingsWriter::::new_boxed() - } + FieldType::U64(_) + | FieldType::I64(_) + | FieldType::Date(_) + | FieldType::HierarchicalFacet => SpecializedPostingsWriter::::new_boxed(), FieldType::Bytes => { // FieldType::Bytes cannot actually be indexed. // TODO fix during the indexer refactoring described in #276 diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index af26258f4..807265348 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -773,8 +773,7 @@ mod tests { last_doc = doc + 1; } index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); let inverted_index = segment_reader.inverted_index(int_field); let term = Term::from_field_u64(int_field, 0u64); @@ -842,8 +841,7 @@ mod tests { index_writer.add_document(doc); } index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let segment_reader = searcher.segment_reader(0); let mut block_segments; diff --git a/src/query/all_query.rs b/src/query/all_query.rs index e6468e2d7..8b2b64233 100644 --- a/src/query/all_query.rs +++ b/src/query/all_query.rs @@ -101,8 +101,9 @@ mod tests { index_writer.commit().unwrap(); index_writer.add_document(doc!(field=>"ccc")); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + reader.reload().unwrap(); + let searcher = reader.searcher(); let weight = AllQuery.weight(&searcher, false).unwrap(); { let reader = searcher.segment_reader(0); diff --git a/src/query/boolean_query/mod.rs b/src/query/boolean_query/mod.rs index b450f7f0a..16f37de26 100644 --- a/src/query/boolean_query/mod.rs +++ b/src/query/boolean_query/mod.rs @@ -51,7 +51,6 @@ mod tests { } assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); (index, text_field) } @@ -60,7 +59,8 @@ mod tests { let (index, text_field) = aux_test_helper(); let query_parser = QueryParser::for_index(&index, vec![text_field]); let query = query_parser.parse_query("(+a +b) d").unwrap(); - assert_eq!(query.count(&*index.searcher()).unwrap(), 3); + let searcher = index.reader().unwrap().searcher(); + assert_eq!(query.count(&searcher).unwrap(), 3); } #[test] @@ -68,7 +68,7 @@ mod tests { let (index, text_field) = aux_test_helper(); let query_parser = QueryParser::for_index(&index, vec![text_field]); let query = query_parser.parse_query("+a").unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let weight = query.weight(&searcher, true).unwrap(); let scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap(); assert!(scorer.is::()); @@ -78,7 +78,7 @@ mod tests { pub fn test_boolean_termonly_intersection() { let (index, text_field) = aux_test_helper(); let query_parser = QueryParser::for_index(&index, vec![text_field]); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); { let query = query_parser.parse_query("+a +b +c").unwrap(); let weight = query.weight(&searcher, true).unwrap(); @@ -97,7 +97,7 @@ mod tests { pub fn test_boolean_reqopt() { let (index, text_field) = aux_test_helper(); let query_parser = QueryParser::for_index(&index, vec![text_field]); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); { let query = query_parser.parse_query("+a b").unwrap(); let weight = query.weight(&searcher, true).unwrap(); @@ -126,10 +126,13 @@ mod tests { query }; + let reader = index.reader().unwrap(); + let matching_docs = |boolean_query: &Query| { - let searcher = index.searcher(); - let test_docs = searcher.search(boolean_query, &TestCollector).unwrap(); - test_docs + reader + .searcher() + .search(boolean_query, &TestCollector) + .unwrap() .docs() .iter() .cloned() @@ -185,10 +188,12 @@ mod tests { let query: Box = Box::new(term_query); query }; - + let reader = index.reader().unwrap(); let score_docs = |boolean_query: &Query| { - let searcher = index.searcher(); - let fruit = searcher.search(boolean_query, &TestCollector).unwrap(); + let fruit = reader + .searcher() + .search(boolean_query, &TestCollector) + .unwrap(); fruit.scores().to_vec() }; diff --git a/src/query/fuzzy_query.rs b/src/query/fuzzy_query.rs index 6539929bf..ea54b789b 100644 --- a/src/query/fuzzy_query.rs +++ b/src/query/fuzzy_query.rs @@ -52,9 +52,8 @@ lazy_static! { /// )); /// index_writer.commit().unwrap(); /// } -/// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// { /// @@ -141,8 +140,8 @@ mod test { )); index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); { let term = Term::from_field_text(country_field, "japon"); diff --git a/src/query/phrase_query/mod.rs b/src/query/phrase_query/mod.rs index 90ae26451..a08e505a4 100644 --- a/src/query/phrase_query/mod.rs +++ b/src/query/phrase_query/mod.rs @@ -31,7 +31,6 @@ mod tests { } assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); index } @@ -46,8 +45,7 @@ mod tests { ]); let schema = index.schema(); let text_field = schema.get_field("text").unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let test_query = |texts: Vec<&str>| { let terms: Vec = texts .iter() @@ -90,8 +88,7 @@ mod tests { index_writer.add_document(doc!(text_field=>"a b c")); assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let phrase_query = PhraseQuery::new(vec![ Term::from_field_text(text_field, "a"), Term::from_field_text(text_field, "b"), @@ -115,8 +112,7 @@ mod tests { let index = create_index(&["a b c", "a b c a b"]); let schema = index.schema(); let text_field = schema.get_field("text").unwrap(); - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let test_query = |texts: Vec<&str>| { let terms: Vec = texts .iter() @@ -148,8 +144,7 @@ mod tests { assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let test_query = |texts: Vec<&str>| { let terms: Vec = texts .iter() @@ -177,8 +172,7 @@ mod tests { index_writer.add_document(doc!(text_field=>"a b c d e f g h")); assert!(index_writer.commit().is_ok()); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let test_query = |texts: Vec<(usize, &str)>| { let terms: Vec<(usize, Term)> = texts .iter() diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index 25b08ae1f..b6ac926fc 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -239,12 +239,13 @@ impl QueryParser { let term = Term::from_field_i64(field, val); Ok(vec![(0, term)]) } - FieldType::Date(_) => { - match chrono::DateTime::parse_from_rfc3339(phrase) { - Ok(x) => Ok(vec![(0, Term::from_field_date(field, &x.with_timezone(&chrono::Utc)))]), - Err(e) => Err(QueryParserError::DateFormatError(e)) - } - } + FieldType::Date(_) => match chrono::DateTime::parse_from_rfc3339(phrase) { + Ok(x) => Ok(vec![( + 0, + Term::from_field_date(field, &x.with_timezone(&chrono::Utc)), + )]), + Err(e) => Err(QueryParserError::DateFormatError(e)), + }, FieldType::U64(_) => { let val: u64 = u64::from_str(phrase)?; let term = Term::from_field_u64(field, val); @@ -791,7 +792,9 @@ mod test { query_parser.parse_query("date:18a"), Err(QueryParserError::DateFormatError(_)) ); - assert!(query_parser.parse_query("date:\"1985-04-12T23:20:50.52Z\"").is_ok()); + assert!(query_parser + .parse_query("date:\"1985-04-12T23:20:50.52Z\"") + .is_ok()); } #[test] diff --git a/src/query/range_query.rs b/src/query/range_query.rs index f111e90e1..3aa996520 100644 --- a/src/query/range_query.rs +++ b/src/query/range_query.rs @@ -61,8 +61,8 @@ fn map_bound TTo>( /// # } /// # index_writer.commit().unwrap(); /// # } -/// # index.load_searchers()?; -/// let searcher = index.searcher(); +/// # let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970); /// @@ -316,8 +316,8 @@ mod tests { } index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960u64..1970u64); @@ -355,8 +355,8 @@ mod tests { index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let count_multiples = |range_query: RangeQuery| searcher.search(&range_query, &Count).unwrap(); diff --git a/src/query/regex_query.rs b/src/query/regex_query.rs index 3d3254b2f..ec7dcceb7 100644 --- a/src/query/regex_query.rs +++ b/src/query/regex_query.rs @@ -44,8 +44,8 @@ use Searcher; /// index_writer.commit().unwrap(); /// } /// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// let term = Term::from_field_text(title, "Diary"); /// let query = RegexQuery::new("d[ai]{2}ry".to_string(), title); @@ -108,8 +108,8 @@ mod test { )); index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); { let regex_query = RegexQuery::new("jap[ao]n".to_string(), country_field); let scored_docs = searcher diff --git a/src/query/term_query/mod.rs b/src/query/term_query/mod.rs index edc4af411..2a85e9383 100644 --- a/src/query/term_query/mod.rs +++ b/src/query/term_query/mod.rs @@ -32,9 +32,7 @@ mod tests { } assert!(index_writer.commit().is_ok()); } - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let term_query = TermQuery::new( Term::from_field_text(text_field, "a"), IndexRecordOption::Basic, @@ -65,8 +63,7 @@ mod tests { index_writer.add_document(doc!(left_field => "left4 left1")); index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); { let term = Term::from_field_text(left_field, "left2"); let term_query = TermQuery::new(term, IndexRecordOption::WithFreqs); diff --git a/src/query/term_query/term_query.rs b/src/query/term_query/term_query.rs index 8ddf42762..6dc52acb2 100644 --- a/src/query/term_query/term_query.rs +++ b/src/query/term_query/term_query.rs @@ -48,9 +48,8 @@ use Term; /// )); /// index_writer.commit()?; /// } -/// -/// index.load_searchers()?; -/// let searcher = index.searcher(); +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); /// /// let query = TermQuery::new( /// Term::from_field_text(title, "diary"), diff --git a/src/reader/mod.rs b/src/reader/mod.rs new file mode 100644 index 000000000..67d0ba52c --- /dev/null +++ b/src/reader/mod.rs @@ -0,0 +1,187 @@ +mod pool; + +use self::pool::{LeasedItem, Pool}; +use core::Segment; +use directory::Directory; +use directory::WatchHandle; +use directory::META_LOCK; +use std::sync::Arc; +use Index; +use Result; +use Searcher; +use SegmentReader; + +/// Defines when a new version of the index should be reloaded. +/// +/// Regardless of whether you search and index in the same process, tantivy does not necessarily +/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define +/// when you want your index to be reloaded. +#[derive(Clone, Copy)] +pub enum ReloadPolicy { + /// The index is entirely reloaded manually. + /// All updates of the index should be manual. + /// + /// No change is reflected automatically. You are required to call `.load_seacher()` manually. + Manual, + /// The index is reloaded within milliseconds after a new commit is available. + /// This is made possible by watching changes in the `meta.json` file. + OnCommit, // TODO add NEAR_REAL_TIME(target_ms) +} + +/// `IndexReader` builder +/// +/// It makes it possible to set the following values. +/// +/// - `num_searchers` (by default, the number of detected CPU threads): +/// +/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block +/// until the one of the searcher in-use gets released. +/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): +/// +/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. +#[derive(Clone)] +pub struct IndexReaderBuilder { + num_searchers: usize, + reload_policy: ReloadPolicy, + index: Index, +} + +impl IndexReaderBuilder { + pub(crate) fn new(index: Index) -> IndexReaderBuilder { + IndexReaderBuilder { + num_searchers: num_cpus::get(), + reload_policy: ReloadPolicy::OnCommit, + index, + } + } + + /// Builds the reader. + /// + /// Building the reader is a non-trivial operation that requires + /// to open different segment readers. It may take hundreds of milliseconds + /// of time and it may return an error. + /// TODO(pmasurel) Use the `TryInto` trait once it is available in stable. + pub fn try_into(self) -> Result { + let inner_reader = InnerIndexReader { + index: self.index, + num_searchers: self.num_searchers, + searcher_pool: Pool::new(), + }; + inner_reader.reload()?; + let inner_reader_arc = Arc::new(inner_reader); + let watch_handle_opt: Option; + match self.reload_policy { + ReloadPolicy::Manual => { + // No need to set anything... + watch_handle_opt = None; + } + ReloadPolicy::OnCommit => { + let inner_reader_arc_clone = inner_reader_arc.clone(); + let callback = move || { + if let Err(err) = inner_reader_arc_clone.reload() { + error!( + "Error while loading searcher after commit was detected. {:?}", + err + ); + } + }; + let watch_handle = inner_reader_arc.index.directory().watch(Box::new(callback)); + watch_handle_opt = Some(watch_handle); + } + } + Ok(IndexReader { + inner: inner_reader_arc, + watch_handle_opt, + }) + } + + /// Sets the reload_policy. + /// + /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. + pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder { + self.reload_policy = reload_policy; + self + } + + /// Sets the number of `Searcher` in the searcher pool. + pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { + self.num_searchers = num_searchers; + self + } +} + +struct InnerIndexReader { + num_searchers: usize, + searcher_pool: Pool, + index: Index, +} + +impl InnerIndexReader { + fn reload(&self) -> Result<()> { + let segment_readers: Vec = { + let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; + let searchable_segments = self.searchable_segments()?; + searchable_segments + .iter() + .map(SegmentReader::open) + .collect::>()? + }; + let schema = self.index.schema(); + let searchers = (0..self.num_searchers) + .map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + /// Returns the list of segments that are searchable + fn searchable_segments(&self) -> Result> { + self.index.searchable_segments() + } + + fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +/// `IndexReader` is your entry point to read and search the index. +/// +/// It controls when a new version of the index should be loaded and lends +/// you instances of `Searcher` for the last loaded version. +/// +/// `Clone` does not clone the different pool of searcher. `IndexReader` +/// just wraps and `Arc`. +#[derive(Clone)] +pub struct IndexReader { + inner: Arc, + watch_handle_opt: Option, +} + +impl IndexReader { + /// Update searchers so that they reflect the state of the last + /// `.commit()`. + /// + /// If you set up the `OnCommit` `ReloadPolicy` (which is the default) + /// every commit should be rapidly reflected on your `IndexReader` and you should + /// not need to call `reload()` at all. + /// + /// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests + /// it can be nice to deterministically force the reload of searchers. + pub fn reload(&self) -> Result<()> { + self.inner.reload() + } + + /// Returns a searcher + /// + /// This method should be called every single time a search + /// query is performed. + /// The searchers are taken from a pool of `num_searchers` searchers. + /// If no searcher is available + /// this may block. + /// + /// The same searcher must be used for a given query, as it ensures + /// the use of a consistent segment set. + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } +} diff --git a/src/core/pool.rs b/src/reader/pool.rs similarity index 100% rename from src/core/pool.rs rename to src/reader/pool.rs diff --git a/src/schema/document.rs b/src/schema/document.rs index 2c535446a..bec54177a 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -90,7 +90,7 @@ impl Document { /// Add a date field pub fn add_date(&mut self, field: Field, value: &DateTime) { - self.add(FieldValue::new(field, Value::Date(DateTime::from(*value)))); + self.add(FieldValue::new(field, Value::Date(*value))); } /// Add a bytes field diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index fc5972b94..89a8b251d 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -87,7 +87,9 @@ impl FieldEntry { pub fn is_indexed(&self) -> bool { match self.field_type { FieldType::Str(ref options) => options.get_indexing_options().is_some(), - FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => options.is_indexed(), + FieldType::U64(ref options) + | FieldType::I64(ref options) + | FieldType::Date(ref options) => options.is_indexed(), FieldType::HierarchicalFacet => true, FieldType::Bytes => false, } @@ -104,7 +106,9 @@ impl FieldEntry { /// Returns true iff the field is stored pub fn is_stored(&self) -> bool { match self.field_type { - FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::Date(ref options) => options.is_stored(), + FieldType::U64(ref options) + | FieldType::I64(ref options) + | FieldType::Date(ref options) => options.is_stored(), FieldType::Str(ref options) => options.is_stored(), // TODO make stored hierarchical facet optional FieldType::HierarchicalFacet => true, diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 79c2656a6..a67451d7a 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -95,7 +95,9 @@ impl FieldType { FieldType::Str(ref text_options) => text_options .get_indexing_options() .map(|indexing_options| indexing_options.index_option()), - FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::Date(ref int_options) => { + FieldType::U64(ref int_options) + | FieldType::I64(ref int_options) + | FieldType::Date(ref int_options) => { if int_options.is_indexed() { Some(IndexRecordOption::Basic) } else { @@ -116,9 +118,9 @@ impl FieldType { match *json { JsonValue::String(ref field_text) => match *self { FieldType::Str(_) => Ok(Value::Str(field_text.clone())), - FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => Err(ValueParsingError::TypeError( - format!("Expected an integer, got {:?}", json), - )), + FieldType::U64(_) | FieldType::I64(_) | FieldType::Date(_) => Err( + ValueParsingError::TypeError(format!("Expected an integer, got {:?}", json)), + ), FieldType::HierarchicalFacet => Ok(Value::Facet(Facet::from(field_text))), FieldType::Bytes => decode(field_text).map(Value::Bytes).map_err(|_| { ValueParsingError::InvalidBase64(format!( diff --git a/src/schema/flags.rs b/src/schema/flags.rs index e766b4c27..104df2b33 100644 --- a/src/schema/flags.rs +++ b/src/schema/flags.rs @@ -43,7 +43,11 @@ pub const FAST: SchemaFlagList = SchemaFlagList { }; impl BitOr> for SchemaFlagList - where Head: Clone, OldHead: Clone, OldTail: Clone { +where + Head: Clone, + OldHead: Clone, + OldTail: Clone, +{ type Output = SchemaFlagList>; fn bitor(self, head: SchemaFlagList) -> Self::Output { @@ -54,7 +58,7 @@ impl BitOr> for SchemaFlagList< } } -impl> BitOr for SchemaFlagList { +impl> BitOr for SchemaFlagList { type Output = IntOptions; fn bitor(self, rhs: IntOptions) -> Self::Output { @@ -62,7 +66,7 @@ impl> BitOr for SchemaFlagList { } } -impl> BitOr for SchemaFlagList { +impl> BitOr for SchemaFlagList { type Output = TextOptions; fn bitor(self, rhs: TextOptions) -> Self::Output { diff --git a/src/schema/int_options.rs b/src/schema/int_options.rs index b1599bd75..a95f236c3 100644 --- a/src/schema/int_options.rs +++ b/src/schema/int_options.rs @@ -1,4 +1,4 @@ -use schema::flags::{SchemaFlagList, FastFlag, IndexedFlag, StoredFlag}; +use schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag}; use std::ops::BitOr; /// Express whether a field is single-value or multi-valued. diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 994b71c9a..7d2be9ac9 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -97,7 +97,7 @@ impl SchemaBuilder { pub fn add_date_field>( &mut self, field_name_str: &str, - field_options: T + field_options: T, ) -> Field { let field_name = String::from(field_name_str); let field_entry = FieldEntry::new_date(field_name, field_options.into()); diff --git a/src/schema/value.rs b/src/schema/value.rs index 0090b4435..cb27c64b3 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -107,10 +107,10 @@ impl Value { } } - /// Returns the Date-value, provided the value is of the `Date` type. - /// - /// # Panics - /// If the value is not of type `Date` + /// Returns the Date-value, provided the value is of the `Date` type. + /// + /// # Panics + /// If the value is not of type `Date` pub fn date_value(&self) -> &DateTime { match *self { Value::Date(ref value) => value, @@ -138,7 +138,9 @@ impl From for Value { } impl From for Value { - fn from(date_time: DateTime) -> Value { Value::Date(date_time) } + fn from(date_time: DateTime) -> Value { + Value::Date(date_time) + } } impl<'a> From<&'a str> for Value { @@ -161,10 +163,10 @@ impl From> for Value { mod binary_serialize { use super::Value; + use chrono::{TimeZone, Utc}; use common::BinarySerializable; use schema::Facet; use std::io::{self, Read, Write}; - use chrono::{Utc, TimeZone}; const TEXT_CODE: u8 = 0; const U64_CODE: u8 = 1; @@ -217,7 +219,7 @@ mod binary_serialize { let value = i64::deserialize(reader)?; Ok(Value::I64(value)) } - DATE_CODE=> { + DATE_CODE => { let timestamp = i64::deserialize(reader)?; Ok(Value::Date(Utc.timestamp(timestamp, 0))) } diff --git a/src/snippet/mod.rs b/src/snippet/mod.rs index 8a3895acb..3fa605b1c 100644 --- a/src/snippet/mod.rs +++ b/src/snippet/mod.rs @@ -241,8 +241,8 @@ fn select_best_fragment_combination(fragments: &[FragmentCandidate], text: &str) /// # let query_parser = QueryParser::for_index(&index, vec![text_field]); /// // ... /// let query = query_parser.parse_query("haleurs flamands").unwrap(); -/// # index.load_searchers()?; -/// # let searcher = index.searcher(); +/// # let reader = index.reader()?; +/// # let searcher = reader.searcher(); /// let mut snippet_generator = SnippetGenerator::create(&searcher, &*query, text_field)?; /// snippet_generator.set_max_num_chars(100); /// let snippet = snippet_generator.snippet_from_doc(&doc); @@ -528,9 +528,8 @@ Survey in 2016, 2017, and 2018."#; index_writer.add_document(doc!(text_field => "a")); index_writer.add_document(doc!(text_field => "a b")); index_writer.commit().unwrap(); - index.load_searchers().unwrap(); } - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let query_parser = QueryParser::for_index(&index, vec![text_field]); { let query = query_parser.parse_query("e").unwrap(); @@ -587,8 +586,7 @@ Survey in 2016, 2017, and 2018."#; } index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let query_parser = QueryParser::for_index(&index, vec![text_field]); let query = query_parser.parse_query("rust design").unwrap(); let mut snippet_generator = diff --git a/src/space_usage/mod.rs b/src/space_usage/mod.rs index f361d6860..8ffb841f0 100644 --- a/src/space_usage/mod.rs +++ b/src/space_usage/mod.rs @@ -304,9 +304,8 @@ mod test { fn test_empty() { let schema = Schema::builder().build(); let index = Index::create_in_ram(schema.clone()); - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let searcher_space_usage = searcher.space_usage(); assert_eq!(0, searcher_space_usage.total()); } @@ -344,8 +343,8 @@ mod test { index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let searcher_space_usage = searcher.space_usage(); assert!(searcher_space_usage.total() > 0); assert_eq!(1, searcher_space_usage.segments().len()); @@ -384,8 +383,8 @@ mod test { index_writer.commit().unwrap(); } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let searcher_space_usage = searcher.space_usage(); assert!(searcher_space_usage.total() > 0); assert_eq!(1, searcher_space_usage.segments().len()); @@ -423,9 +422,8 @@ mod test { index_writer.add_document(doc!(name => "hello hi goodbye")); index_writer.commit().unwrap(); } - - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let searcher_space_usage = searcher.space_usage(); assert!(searcher_space_usage.total() > 0); assert_eq!(1, searcher_space_usage.segments().len()); @@ -471,9 +469,8 @@ mod test { index_writer2.commit().unwrap(); } - index.load_searchers().unwrap(); - - let searcher = index.searcher(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); let searcher_space_usage = searcher.space_usage(); assert!(searcher_space_usage.total() > 0); assert_eq!(1, searcher_space_usage.segments().len()); diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index f3157118c..89a98f072 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -159,8 +159,7 @@ mod tests { index_writer.commit().unwrap(); } } - index.load_searchers().unwrap(); - let searcher = index.searcher(); + let searcher = index.reader().unwrap().searcher(); let field_searcher = searcher.field(text_field); let mut term_it = field_searcher.terms();