diff --git a/README.md b/README.md index 395acd5a4..d9138a07b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -![Tantivy](http://fulmicoton.com/tantivy_500.png#h) +![Tantivy](http://fulmicoton.com/tantivy-logo/tantivy-logo.png) [![Build Status](https://travis-ci.org/fulmicoton/tantivy.svg?branch=master)](https://travis-ci.org/fulmicoton/tantivy) [![Coverage Status](https://coveralls.io/repos/github/fulmicoton/tantivy/badge.svg?branch=master)](https://coveralls.io/github/fulmicoton/tantivy?branch=master) @@ -13,11 +13,11 @@ It is strongly inspired by Lucene's design. # Features - configurable indexing (optional term frequency and position indexing) -- Tf-Idf scoring +- tf-idf scoring - Basic query language - Incremental indexing -- Multithreaded indexing (indexing en wikipedia takes 4mn on my desktop) -- Mmap based +- Multithreaded indexing (indexing English Wikipedia takes 4 minutes on my desktop) +- mmap based - SIMD integer compression - u32 fast fields (equivalent of doc values in Lucene) - LZ4 compressed document store @@ -35,7 +35,7 @@ It will walk you through getting a wikipedia search engine up and running in a f Tantivy has a git submodule called `simdcomp`. After cloning the repository, you will need to initialize and update -the submodules. The project can then be build using `cargo`. +the submodules. The project can then be built using `cargo`. git clone git@github.com:fulmicoton/tantivy.git git submodule init diff --git a/examples/simple_search.rs b/examples/simple_search.rs index 851968576..39efabe13 100644 --- a/examples/simple_search.rs +++ b/examples/simple_search.rs @@ -25,7 +25,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // # Defining the schema // - // Tantivy index require to have a very strict schema. + // The Tantivy index requires a very strict schema. // The schema declares which fields are in the index, // and for each field, its type and "the way it should // be indexed". @@ -47,7 +47,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // `STORED` means that the field will also be saved // in a compressed, row-oriented key-value store. // This store is useful to reconstruct the - // document that were selected during the search phase. + // documents that were selected during the search phase. schema_builder.add_text_field("title", TEXT | STORED); // Our first field is body. @@ -64,29 +64,29 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // Let's create a brand new index. // // This will actually just save a meta.json - // with our schema the directory. + // with our schema in the directory. let index = try!(Index::create(index_path, schema.clone())); // To insert document we need an index writer. - // There shall be only one writer at a time. - // Besides, this single `IndexWriter` is already + // There must be only one writer at a time. + // This single `IndexWriter` is already // multithreaded. // - // Here we used a buffer of 1 GB. Using a bigger + // Here we use a buffer of 1 GB. Using a bigger // heap for the indexer can increase its throughput. // This buffer will be split between the indexing // threads. let mut index_writer = try!(index.writer(1_000_000_000)); - // Let's now index our documents! + // Let's index our documents! // We first need a handle on the title and the body field. // ### Create a document "manually". // - // We can create a document manually, by setting adding the fields + // We can create a document manually, by setting the fields // one by one in a Document object. let title = schema.get_field("title").unwrap(); let body = schema.get_field("body").unwrap(); @@ -122,7 +122,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // This is an example, so we will only index 3 documents // here. You can check out tantivy's tutorial to index // the English wikipedia. Tantivy's indexing is rather fast. - // Indexing 5 millions articles of the English wikipedia takes + // Indexing 5 million articles of the English wikipedia takes // around 4 minutes on my computer! @@ -131,56 +131,56 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // At this point our documents are not searchable. // // - // We need to call .commit() explicitely to force the + // We need to call .commit() explicitly to force the // index_writer to finish processing the documents in the queue, - // flush the current index on the disk, and advertise + // flush the current index to the disk, and advertise // the existence of new documents. // // This call is blocking. try!(index_writer.commit()); // If `.commit()` returns correctly, then all of the - // documents have been added before are guaranteed to be + // documents that have been added are guaranteed to be // persistently indexed. // // In the scenario of a crash or a power failure, - // tantivy behaves as if it rollbacked to its last + // tantivy behaves as if has rolled back to its last // commit. // # Searching // - // Let's search our index. This starts + // Let's search our index. We start // by creating a searcher. There can be more // than one searcher at a time. // - // You are supposed to acquire a search + // You should create a searcher // every time you start a "search query". let searcher = index.searcher(); // The query parser can interpret human queries. // Here, if the user does not specify which - // field he wants to search, tantivy will search + // field they want to search, tantivy will search // in both title and body. let query_parser = QueryParser::new(index.schema(), vec!(title, body)); // QueryParser may fail if the query is not in the right // format. For user facing applications, this can be a problem. - // A ticket has been filled regarding this problem. + // A ticket has been opened regarding this problem. let query = try!(query_parser.parse_query("sea whale")); // A query defines a set of documents, as // well as the way they should be scored. // - // Query created by the query parser are scoring according + // A query created by the query parser is scored according // to a metric called Tf-Idf, and will consider // any document matching at least one of our terms. // ### Collectors // - // We are not interested in all of the document but - // only in the top 10. Keep track of our top 10 best documents + // We are not interested in all of the documents but + // only in the top 10. Keeping track of our top 10 best documents // is the role of the TopCollector. let mut top_collector = TopCollector::with_limit(10); @@ -188,14 +188,14 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // We can now perform our query. try!(query.search(&searcher, &mut top_collector)); - // Our top collector now contains are 10 + // Our top collector now contains the 10 // most relevant doc ids... let doc_addresses = top_collector.docs(); // The actual documents still need to be // retrieved from Tantivy's store. // - // Since body was not configured as stored, + // Since the body field was not configured as stored, // the document returned will only contain // a title. @@ -205,4 +205,4 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { } Ok(()) -} \ No newline at end of file +} diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs index 9eaf74165..44d547ec3 100644 --- a/src/collector/count_collector.rs +++ b/src/collector/count_collector.rs @@ -5,13 +5,13 @@ use SegmentReader; use SegmentLocalId; /// `CountCollector` collector only counts how many -/// document are matching the query. +/// documents match the query. pub struct CountCollector { count: usize, } impl CountCollector { - /// Returns the count of document that where + /// Returns the count of documents that were /// collected. pub fn count(&self,) -> usize { self.count @@ -20,8 +20,7 @@ impl CountCollector { impl Default for CountCollector { fn default() -> CountCollector { - CountCollector { - count: 0, + CountCollector {count: 0, } } } diff --git a/src/collector/mod.rs b/src/collector/mod.rs index de4092738..683b7eb1c 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -20,16 +20,16 @@ pub use self::chained_collector::chain; /// /// /// For instance, -/// - keeping track of the top 10 best documents -/// - computing a break down over a fast field -/// - computing the number of documents matching the query /// +/// - keeping track of the top 10 best documents +/// - computing a breakdown over a fast field +/// - computing the number of documents matching the query /// /// Queries are in charge of pushing the `DocSet` to the collector. /// -/// As they work on multiple segment, they first inform -/// the collector of a change in segment and then -/// call the collect method to push document to the collector. +/// As they work on multiple segments, they first inform +/// the collector of a change in a segment and then +/// call the `collect` method to push the document to the collector. /// /// Temporally, our collector will receive calls /// - `.set_segment(0, segment_reader_0)` @@ -45,10 +45,10 @@ pub use self::chained_collector::chain; /// /// Segments are not guaranteed to be visited in any specific order. pub trait Collector { - /// `set_segment` is called before starting enumerating + /// `set_segment` is called before beginning to enumerate /// on this segment. fn set_segment(&mut self, segment_local_id: SegmentLocalId, segment: &SegmentReader) -> io::Result<()>; - /// The query pushes scored document to the collector via this method. + /// The query pushes the scored document to the collector via this method. fn collect(&mut self, scored_doc: ScoredDoc); } @@ -57,7 +57,7 @@ impl<'a, C: Collector> Collector for &'a mut C { fn set_segment(&mut self, segment_local_id: SegmentLocalId, segment: &SegmentReader) -> io::Result<()> { (*self).set_segment(segment_local_id, segment) } - /// The query pushes scored document to the collector via this method. + /// The query pushes the scored document to the collector via this method. fn collect(&mut self, scored_doc: ScoredDoc) { (*self).collect(scored_doc); } @@ -120,10 +120,10 @@ pub mod tests { - /// Collects in order all of the fast field for all of the - /// doc of the `DocSet` + /// Collects in order all of the fast fields for all of the + /// doc in the `DocSet` /// - /// This collector is essentially useful for tests. + /// This collector is mainly useful for tests. pub struct FastFieldTestCollector { vals: Vec, field: Field, diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 092547c78..92958018d 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -5,7 +5,7 @@ use SegmentReader; use SegmentLocalId; -/// Multicollector makes it possible to collect on more than one collector +/// Multicollector makes it possible to collect on more than one collector. /// It should only be used for use cases where the Collector types is unknown /// at compile time. /// If the type of the collectors is known, you should prefer to use `ChainedCollector`. @@ -60,4 +60,4 @@ mod tests { assert_eq!(count_collector.count(), 3); assert!(top_collector.at_capacity()); } -} \ No newline at end of file +} diff --git a/src/collector/top_collector.rs b/src/collector/top_collector.rs index 63f9aa112..29ca0bf8b 100644 --- a/src/collector/top_collector.rs +++ b/src/collector/top_collector.rs @@ -53,7 +53,7 @@ pub struct TopCollector { impl TopCollector { - /// Creates a top collector, with a number of document of "limit" + /// Creates a top collector, with a number of documents equal to "limit". /// /// # Panics /// The method panics if limit is 0 @@ -68,9 +68,9 @@ impl TopCollector { } } - /// Returns the decreasingly sorted K-best documents. + /// Returns K best documents sorted in decreasing order. /// - /// Calling this method will triggers the sort. + /// Calling this method triggers the sort. /// The result of the sort is not cached. pub fn docs(&self) -> Vec { self.score_docs() @@ -79,9 +79,9 @@ impl TopCollector { .collect() } - /// Returns the decreasingly sorted K-best ScoredDocument. + /// Returns K best ScoredDocument sorted in decreasing order. /// - /// Calling this method will triggers the sort. + /// Calling this method triggers the sort. /// The result of the sort is not cached. pub fn score_docs(&self) -> Vec<(Score, DocAddress)> { let mut scored_docs: Vec = self.heap @@ -93,9 +93,9 @@ impl TopCollector { .map(|GlobalScoredDoc {score, doc_address}| (score, doc_address)) .collect() } - - /// Return true iff at least K document have gone through - /// the collector. + + /// Return true iff at least K documents have gone through + /// the collector. #[inline] pub fn at_capacity(&self, ) -> bool { self.heap.len() >= self.limit @@ -183,8 +183,8 @@ mod tests { .collect(); assert_eq!(docs, vec!(7, 1, 5, 3)); } - - + + } #[test] @@ -192,4 +192,4 @@ mod tests { fn test_top_0() { TopCollector::with_limit(0); } -} \ No newline at end of file +} diff --git a/src/core/index.rs b/src/core/index.rs index 398d38e11..46b8a5c75 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,6 +1,5 @@ use Result; use Error; -use std::path::Path; use schema::Schema; use std::sync::Arc; use std::fmt; @@ -15,29 +14,27 @@ use super::segment::Segment; use core::SegmentReader; use super::pool::Pool; use super::pool::LeasedItem; +use std::path::Path; use indexer::SegmentManager; use core::IndexMeta; use core::META_FILEPATH; use super::segment::create_segment; -const NUM_SEARCHERS: usize = 12; +const NUM_SEARCHERS: usize = 12; /// Accessor to the index segment manager /// -/// This method is not part of tantivy's public API +/// This method is not part of tantivy's public API pub fn get_segment_manager(index: &Index) -> Arc { index.segment_manager.clone() } - + fn load_metas(directory: &Directory) -> Result { let meta_file = try!(directory.open_read(&META_FILEPATH)); let meta_content = String::from_utf8_lossy(meta_file.as_slice()); - let loaded_meta = try!( - json::decode(&meta_content) - .map_err(|e| Error::CorruptedFile(META_FILEPATH.clone(), Box::new(e))) - ); - Ok(loaded_meta) + json::decode(&meta_content) + .map_err(|e| Error::CorruptedFile(META_FILEPATH.clone(), Box::new(e))) } // pub fn set_metas(index: &mut Index, docstamp: u64) { @@ -47,24 +44,23 @@ fn load_metas(directory: &Directory) -> Result { /// Tantivy's Search Index pub struct Index { segment_manager: Arc, - + directory: Box, schema: Schema, searcher_pool: Arc>, docstamp: u64, - } impl Index { /// Creates a new index using the `RAMDirectory`. /// /// The index will be allocated in anonymous memory. - /// This should only be used for unit tests. + /// This should only be used for unit tests. pub fn create_in_ram(schema: Schema) -> Index { let directory = Box::new(RAMDirectory::create()); - Index::from_directory(directory, schema).expect("Creating a RAMDirectory should never fail") // unwrap is ok here + Index::from_directory(directory, schema).expect("Creating a RAMDirectory should never fail") // unwrap is ok here } - + /// Creates a new index in a given filepath. /// /// The index will use the `MMapDirectory`. @@ -76,7 +72,7 @@ impl Index { /// Creates a new index in a temp directory. /// /// The index will use the `MMapDirectory` in a newly created directory. - /// The temp directory will be destroyed automatically when the Index object + /// The temp directory will be destroyed automatically when the `Index` object /// is destroyed. /// /// The temp directory is only used for testing the `MmapDirectory`. @@ -85,8 +81,8 @@ impl Index { let directory = Box::new(try!(MmapDirectory::create_from_tempdir())); Index::from_directory(directory, schema) } - - /// Creates a new index given a directory and an IndexMeta. + + /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: Box, metas: IndexMeta) -> Result { let schema = metas.schema.clone(); let docstamp = metas.docstamp; @@ -102,13 +98,10 @@ impl Index { try!(index.load_searchers()); Ok(index) } - + /// Opens a new directory from a directory. pub fn from_directory(directory: Box, schema: Schema) -> Result { - Index::create_from_metas( - directory, - IndexMeta::with_schema(schema) - ) + Index::create_from_metas(directory, IndexMeta::with_schema(schema)) } /// Opens a new directory from an index path. @@ -117,37 +110,49 @@ impl Index { let metas = try!(load_metas(&directory)); //< TODO does the directory already exists? Index::create_from_metas(directory.box_clone(), metas) } - + /// Returns the index docstamp. /// /// The docstamp is the number of documents that have been added /// from the beginning of time, and until the moment of the last commit. - pub fn docstamp(&self,) -> u64 { + pub fn docstamp(&self) -> u64 { self.docstamp } - + /// Creates a multithreaded writer. - /// Each writer produces an independant segment. - pub fn writer_with_num_threads(&self, num_threads: usize, heap_size_in_bytes: usize) -> Result { + /// Each writer produces an independent segment. + /// + /// # Errors + /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// # Panics + /// If the heap size per thread is too small, panics. + pub fn writer_with_num_threads(&self, + num_threads: usize, + heap_size_in_bytes: usize) + -> Result { IndexWriter::open(self, num_threads, heap_size_in_bytes) } - - + + /// Creates a multithreaded writer - /// It just calls `writer_with_num_threads` with the number of core as `num_threads` + /// It just calls `writer_with_num_threads` with the number of cores as `num_threads` + /// # Errors + /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// # Panics + /// If the heap size per thread is too small, panics. pub fn writer(&self, heap_size_in_bytes: usize) -> Result { self.writer_with_num_threads(num_cpus::get(), heap_size_in_bytes) } - + /// Accessor to the index schema /// /// The schema is actually cloned. - pub fn schema(&self,) -> Schema { + pub fn schema(&self) -> Schema { self.schema.clone() } /// Returns the list of segments that are searchable - pub fn searchable_segments(&self,) -> Vec { + pub fn searchable_segments(&self) -> Vec { self.searchable_segment_ids() .into_iter() .map(|segment_id| self.segment(segment_id)) @@ -155,92 +160,88 @@ impl Index { } /// Remove all of the file associated with the segment. - /// + /// /// This method cannot fail. If a problem occurs, /// some files may end up never being removed. - /// The error will only be logged. + /// The error will only be logged. pub fn delete_segment(&self, segment_id: SegmentId) { self.segment(segment_id).delete(); } - - /// Return a segment object given a segment_id + + /// Return a segment object given a `segment_id` /// /// The segment may or may not exist. pub fn segment(&self, segment_id: SegmentId) -> Segment { create_segment(self.clone(), segment_id) } - + /// Return a reference to the index directory. - pub fn directory(&self,) -> &Directory { + pub fn directory(&self) -> &Directory { &*self.directory } - + /// Return a mutable reference to the index directory. - pub fn directory_mut(&mut self,) -> &mut Directory { + pub fn directory_mut(&mut self) -> &mut Directory { &mut *self.directory } - + /// Returns the list of segment ids that are searchable. - fn searchable_segment_ids(&self,) -> Vec { + fn searchable_segment_ids(&self) -> Vec { self.segment_manager.committed_segments() } - + /// Creates a new segment. - pub fn new_segment(&self,) -> Segment { + pub fn new_segment(&self) -> Segment { self.segment(SegmentId::generate_random()) } - - /// Creates a new generation of searchers after + + /// Creates a new generation of searchers after /// a change of the set of searchable indexes. /// /// This needs to be called when a new segment has been /// published or after a merge. - pub fn load_searchers(&self,) -> Result<()>{ + pub fn load_searchers(&self) -> Result<()> { let searchable_segments = self.searchable_segments(); let mut searchers = Vec::new(); for _ in 0..NUM_SEARCHERS { let searchable_segments_clone = searchable_segments.clone(); - let segment_readers: Vec = try!( - searchable_segments_clone - .into_iter() - .map(SegmentReader::open) - .collect() - ); + let segment_readers: Vec = try!(searchable_segments_clone.into_iter() + .map(SegmentReader::open) + .collect()); let searcher = Searcher::from(segment_readers); searchers.push(searcher); } self.searcher_pool.publish_new_generation(searchers); Ok(()) } - + /// Returns a searcher - /// + /// /// This method should be called every single time a search /// query is performed. - /// The searcher are taken from a pool of `NUM_SEARCHERS` searchers. + /// The searchers are taken from a pool of `NUM_SEARCHERS` searchers. /// If no searcher is available - /// it may block. + /// this may block. /// - /// The same searcher must be used for a given query, as it ensures - /// the use of a consistent segment set. - pub fn searcher(&self,) -> LeasedItem { + /// The same searcher must be used for a given query, as it ensures + /// the use of a consistent segment set. + pub fn searcher(&self) -> LeasedItem { self.searcher_pool.acquire() } - } impl fmt::Debug for Index { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Index({:?})", self.directory) - } + write!(f, "Index({:?})", self.directory) + } } impl Clone for Index { - fn clone(&self,) -> Index { + fn clone(&self) -> Index { Index { segment_manager: self.segment_manager.clone(), - + directory: self.directory.box_clone(), schema: self.schema.clone(), searcher_pool: self.searcher_pool.clone(), diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index be1e46ec2..148e34ab2 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -25,7 +25,7 @@ use schema::TextIndexingOptions; use error::Error; -/// Entrypoint to access all of the datastructures of the `Segment` +/// Entry point to access all of the datastructures of the `Segment` /// /// - term dictionary /// - postings @@ -34,8 +34,8 @@ use error::Error; /// - field norm reader /// /// The segment reader has a very low memory footprint, -/// as close to all of the memory data is in Mmapped. -/// +/// as close to all of the memory data is mmapped. +/// pub struct SegmentReader { segment_info: SegmentInfo, segment_id: SegmentId, @@ -51,7 +51,7 @@ pub struct SegmentReader { impl SegmentReader { /// Returns the highest document id ever attributed in /// this segment + 1. - /// Today, `tantivy` does not handle deletes so, it happens + /// Today, `tantivy` does not handle deletes, so it happens /// to also be the number of documents in the index. pub fn max_doc(&self) -> DocId { self.segment_info.max_doc @@ -233,7 +233,7 @@ impl SegmentReader { self.read_postings(term, segment_posting_option) } - /// Returns the term info of associated with the term. + /// Returns the term info associated with the term. pub fn get_term_info(&self, term: &Term) -> Option { self.term_infos.get(term.as_slice()) } diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 7ad2a65f2..6171ed606 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -9,7 +9,7 @@ use std::marker::Sync; /// Write-once read many (WORM) abstraction for where tantivy's index should be stored. /// -/// There is currently two implementations of `Directory` +/// There are currently two implementations of `Directory` /// /// - The [`MMapDirectory`](struct.MmapDirectory.html), this /// should be your default choice. @@ -20,19 +20,19 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static { /// Opens a virtual file for read. /// - /// Once a virtualfile is open, its data may not + /// Once a virtual file is open, its data may not /// change. /// - /// Specifically, subsequent write or flush should - /// have no effect the returned `ReadOnlySource` object. + /// Specifically, subsequent writes or flushes should + /// have no effect on the returned `ReadOnlySource` object. fn open_read(&self, path: &Path) -> result::Result; /// Removes a file /// - /// Removing a file will not affect eventual + /// Removing a file will not affect an eventual /// existing ReadOnlySource pointing to it. /// - /// Removing a non existing files, yields a + /// Removing a nonexistent file, yields a /// `FileError::DoesNotExist`. fn delete(&self, path: &Path) -> result::Result<(), FileError>; @@ -47,28 +47,28 @@ pub trait Directory: fmt::Debug + Send + Sync + 'static { /// same path should return a `ReadOnlySource`. /// /// Write operations may be aggressively buffered. - /// The client of this trait is in charge to call flush + /// The client of this trait is responsible for calling flush /// to ensure that subsequent `read` operations - /// will take in account preceding `write` operations. + /// will take into account preceding `write` operations. /// /// Flush operation should also be persistent. /// - /// User shall not rely on `Drop` triggering `flush`. + /// The user shall not rely on `Drop` triggering `flush`. /// Note that `RAMDirectory` will panic! if `flush` /// was not called. /// - /// The file may not previously exists. + /// The file may not previously exist. fn open_write(&mut self, path: &Path) -> Result; - /// Atomically replace the content of a file by data. + /// Atomically replace the content of a file with data. /// /// This calls ensure that reads can never *observe* /// a partially written file. /// - /// The file may or may not previously exists. + /// The file may or may not previously exist. fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>; - /// Clone the directory and boxes the clone + /// Clones the directory and boxes the clone fn box_clone(&self) -> Box; } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index a11a5e88b..d6a6cdbd3 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -47,7 +47,7 @@ impl MmapDirectory { /// Creates a new MmapDirectory in a temporary directory. /// /// This is mostly useful to test the MmapDirectory itself. - /// For your unit test, prefer the RAMDirectory. + /// For your unit tests, prefer the RAMDirectory. pub fn create_from_tempdir() -> io::Result { let tempdir = try!(TempDir::new("index")); let tempdir_path = PathBuf::from(tempdir.path()); @@ -81,7 +81,7 @@ impl MmapDirectory { } /// Joins a relative_path to the directory `root_path` - /// to create proper complete `filepath`. + /// to create a proper complete `filepath`. fn resolve_path(&self, relative_path: &Path) -> PathBuf { self.root_path.join(relative_path) } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 7a93bee7f..30e131237 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -11,7 +11,7 @@ use directory::error::{OpenWriteError, FileError}; use directory::WritePtr; use super::shared_vec_slice::SharedVecSlice; -/// Writer associated to the `RAMDirectory` +/// Writer associated with the `RAMDirectory` /// /// The Writer just writes a buffer. /// @@ -140,9 +140,9 @@ impl fmt::Debug for RAMDirectory { } -/// Directory storing everything in anonymous memory. +/// A Directory storing everything in anonymous memory. /// -/// It's main purpose is unit test. +/// It is mainly meant for unit testing. /// Writes are only made visible upon flushing. /// #[derive(Clone)] @@ -168,7 +168,7 @@ impl Directory for RAMDirectory { fn open_write(&mut self, path: &Path) -> Result { let path_buf = PathBuf::from(path); let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone()); - // force the creation of the file to mimick the MMap directory. + // force the creation of the file to mimic the MMap directory. if try!(self.fs.write(path_buf.clone(), &Vec::new())) { Err(OpenWriteError::FileAlreadyExists(path_buf)) } diff --git a/src/indexer/directory_lock.rs b/src/indexer/directory_lock.rs new file mode 100644 index 000000000..ef786daa4 --- /dev/null +++ b/src/indexer/directory_lock.rs @@ -0,0 +1,32 @@ +use Directory; +use std::path::Path; +use error::Result; + +pub const LOCKFILE_NAME: &'static str = ".tantivy-indexer.lock"; + + +/// The directory lock is a mechanism used to +/// prevent the creation of two [`IndexWriter`](struct.IndexWriter.html) +/// +/// Only one lock can exist at a time for a given directory. +/// The lock is release automatically on `Drop`. +pub struct DirectoryLock { + directory: Box, +} + +impl DirectoryLock { + pub fn lock(mut directory: Box) -> Result { + let lockfile_path = Path::new(LOCKFILE_NAME); + try!(directory.open_write(lockfile_path)); + Ok(DirectoryLock { directory: directory }) + } +} + +impl Drop for DirectoryLock { + fn drop(&mut self) { + let lockfile_path = Path::new(LOCKFILE_NAME); + if let Err(e) = self.directory.delete(lockfile_path) { + error!("Failed to remove the lock file. {:?}", e); + } + } +} \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e8f504628..4992e6324 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -8,6 +8,7 @@ use core::Segment; use std::thread::JoinHandle; use rustc_serialize::json; use indexer::SegmentWriter; +use super::directory_lock::DirectoryLock; use std::clone::Clone; use std::io; use std::io::Write; @@ -39,21 +40,20 @@ pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32; const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; + type DocumentSender = chan::Sender; type DocumentReceiver = chan::Receiver; -fn create_metas(segment_manager: &SegmentManager, - schema: Schema, - docstamp: u64) -> IndexMeta { - let (committed_segments, uncommitted_segments) = segment_manager.segment_metas(); - IndexMeta { - committed_segments: committed_segments, - uncommitted_segments: uncommitted_segments, - schema: schema, - docstamp: docstamp, - } +fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta { + let (committed_segments, uncommitted_segments) = segment_manager.segment_metas(); + IndexMeta { + committed_segments: committed_segments, + uncommitted_segments: uncommitted_segments, + schema: schema, + docstamp: docstamp, + } } @@ -61,21 +61,20 @@ fn create_metas(segment_manager: &SegmentManager, /// This operation is atomic : /// Either // - it fails, in which case an error is returned, -/// and the `meta.json` remains untouched, -/// - it success, and `meta.json` is written +/// and the `meta.json` remains untouched, +/// - it success, and `meta.json` is written /// and flushed. /// /// This method is not part of tantivy's public API -pub fn save_metas( - segment_manager: &SegmentManager, - schema: Schema, - docstamp: u64, - directory: &mut Directory) -> Result<()> { +pub fn save_metas(segment_manager: &SegmentManager, + schema: Schema, + docstamp: u64, + directory: &mut Directory) + -> Result<()> { let metas = create_metas(segment_manager, schema, docstamp); let mut w = Vec::new(); try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); - directory - .atomic_write(&META_FILEPATH, &w[..]) + directory.atomic_write(&META_FILEPATH, &w[..]) .map_err(From::from) } @@ -87,23 +86,27 @@ pub fn save_metas( /// Each indexing thread builds its own independant `Segment`, via /// a `SegmentWriter` object. pub struct IndexWriter { - index: Index, - heap_size_in_bytes_per_thread: usize, - - workers_join_handle: Vec>>, - - document_receiver: DocumentReceiver, - document_sender: DocumentSender, - - segment_update_sender: SegmentUpdateSender, - segment_update_thread: JoinHandle<()>, - - worker_id: usize, - - num_threads: usize, - - uncommitted_docstamp: u64, - committed_docstamp: u64, + // the lock is just used to bind the + // lifetime of the lock with that of the IndexWriter. + _directory_lock: DirectoryLock, + + index: Index, + heap_size_in_bytes_per_thread: usize, + + workers_join_handle: Vec>>, + + document_receiver: DocumentReceiver, + document_sender: DocumentSender, + + segment_update_sender: SegmentUpdateSender, + segment_update_thread: JoinHandle<()>, + + worker_id: usize, + + num_threads: usize, + + uncommitted_docstamp: u64, + committed_docstamp: u64, } // IndexWriter cannot be sent to another thread. @@ -112,469 +115,486 @@ impl !Sync for IndexWriter {} fn index_documents(heap: &mut Heap, - segment: Segment, - schema: &Schema, - document_iterator: &mut Iterator, - segment_update_sender: &mut SegmentUpdateSender) -> Result<()> { - heap.clear(); - let segment_id = segment.id(); - let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema)); - for doc in document_iterator { - try!(segment_writer.add_document(&doc, &schema)); - if segment_writer.is_buffer_full() { - info!("Buffer limit reached, flushing segment with maxdoc={}.", segment_writer.max_doc()); - break; - } - } - let num_docs = segment_writer.max_doc() as usize; - let segment_meta = SegmentMeta { - segment_id: segment_id, - num_docs: num_docs, - }; + segment: Segment, + schema: &Schema, + document_iterator: &mut Iterator, + segment_update_sender: &mut SegmentUpdateSender) + -> Result<()> { + heap.clear(); + let segment_id = segment.id(); + let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema)); + for doc in document_iterator { + try!(segment_writer.add_document(&doc, &schema)); + if segment_writer.is_buffer_full() { + info!("Buffer limit reached, flushing segment with maxdoc={}.", + segment_writer.max_doc()); + break; + } + } + let num_docs = segment_writer.max_doc() as usize; + let segment_meta = SegmentMeta { + segment_id: segment_id, + num_docs: num_docs, + }; + + try!(segment_writer.finalize()); + segment_update_sender.send(SegmentUpdate::AddSegment(segment_meta)); + Ok(()) - try!(segment_writer.finalize()); - segment_update_sender.send(SegmentUpdate::AddSegment(segment_meta)); - Ok(()) } impl IndexWriter { - - - /// The index writer - pub fn wait_merging_threads(mut self) -> Result<()> { + /// The index writer + pub fn wait_merging_threads(mut self) -> Result<()> { - self.segment_update_sender.send(SegmentUpdate::Terminate); - - drop(self.segment_update_sender); + self.segment_update_sender.send(SegmentUpdate::Terminate); - // this will stop the indexing thread, - // dropping the last reference to the segment_update_sender. - drop(self.document_sender); + // this will stop the indexing thread, + // dropping the last reference to the segment_update_sender. + drop(self.document_sender); - let mut v = Vec::new(); - mem::swap(&mut v, &mut self.workers_join_handle); - for join_handle in v { - try!( - join_handle - .join() - .expect("Indexing Worker thread panicked") - .map_err(|e| Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e))) - ); - } - drop(self.workers_join_handle); - self.segment_update_thread - .join() - .map_err(|err| { - error!("Error in the merging thread {:?}", err); - Error::ErrorInThread(format!("{:?}", err)) - }) - } - - /// Spawns a new worker thread for indexing. - /// The thread consumes documents from the pipeline. - /// - fn add_indexing_worker(&mut self,) -> Result<()> { - let index = self.index.clone(); - let schema = self.index.schema(); - let document_receiver_clone = self.document_receiver.clone(); - let mut segment_update_sender = self.segment_update_sender.clone(); - - let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); - - let join_handle: JoinHandle> = try!(thread::Builder::new() - .name(format!("indexing_thread_{}", self.worker_id)) - .spawn(move || { - loop { - let segment = index.new_segment(); - let mut document_iterator = document_receiver_clone - .clone() - .into_iter() - .peekable(); - // the peeking here is to avoid - // creating a new segment's files - // if no document are available. - if document_iterator.peek().is_some() { - try!( - index_documents( - &mut heap, - segment, - &schema, - &mut document_iterator, - &mut segment_update_sender) - ); - } - else { - // No more documents. - // Happens when there is a commit, or if the `IndexWriter` - // was dropped. - return Ok(()); - } - } - })); - self.worker_id += 1; - self.workers_join_handle.push(join_handle); + let mut v = Vec::new(); + mem::swap(&mut v, &mut self.workers_join_handle); + for join_handle in v { + try!(join_handle.join() + .expect("Indexing Worker thread panicked") + .map_err(|e| { + Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e)) + })); + } + drop(self.workers_join_handle); + self.segment_update_thread + .join() + .map_err(|err| { + error!("Error in the merging thread {:?}", err); + Error::ErrorInThread(format!("{:?}", err)) + }) + } - Ok(()) - } + /// Spawns a new worker thread for indexing. + /// The thread consumes documents from the pipeline. + /// + fn add_indexing_worker(&mut self) -> Result<()> { + let index = self.index.clone(); + let schema = self.index.schema(); + let document_receiver_clone = self.document_receiver.clone(); + let mut segment_update_sender = self.segment_update_sender.clone(); + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); + let join_handle: JoinHandle> = try!(thread::Builder::new() + .name(format!("indexing_thread_{}", self.worker_id)) + .spawn(move || { + loop { + let segment = index.new_segment(); + let mut document_iterator = document_receiver_clone.clone() + .into_iter() + .peekable(); + // the peeking here is to avoid + // creating a new segment's files + // if no document are available. + if document_iterator.peek().is_some() { + try!(index_documents(&mut heap, + segment, + &schema, + &mut document_iterator, + &mut segment_update_sender)); + } else { + // No more documents. + // Happens when there is a commit, or if the `IndexWriter` + // was dropped. + return Ok(()); + } + } + })); + self.worker_id += 1; + self.workers_join_handle.push(join_handle); + Ok(()) + } - fn on_change(&mut self,) -> Result<()> { - let segment_manager = get_segment_manager(&self.index); - // saving the meta file. - try!( - save_metas( - &*segment_manager, - self.index.schema(), - self.committed_docstamp, - self.index.directory_mut()) - ); - try!(self.index.load_searchers()); - Ok(()) - } - - /// Open a new index writer - /// - /// num_threads tells the number of indexing worker that - /// should work at the same time. - pub fn open(index: &Index, - num_threads: usize, - heap_size_in_bytes_per_thread: usize) -> Result { - if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize { - panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT)); - } - let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - - let segment_updater = SegmentUpdater::new(index.clone()); + fn on_change(&mut self) -> Result<()> { + let segment_manager = get_segment_manager(&self.index); + // saving the meta file. + try!(save_metas(&*segment_manager, + self.index.schema(), + self.committed_docstamp, + self.index.directory_mut())); + try!(self.index.load_searchers()); + Ok(()) + } - let segment_update_sender = segment_updater.update_channel(); - - let segment_update_thread = segment_updater.start(); - - let mut index_writer = IndexWriter { - heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, - index: index.clone(), - - document_receiver: document_receiver, - document_sender: document_sender, + /// Open a new index writer. Attempts to acquire a lockfile. + /// + /// The lockfile should be deleted on drop, but it is possible + /// that due to a panic or other error, a stale lockfile will be + /// left in the index directory. If you are sure that no other + /// `IndexWriter` on the system is accessing the index directory, + /// it is safe to manually delete the lockfile. + /// + /// num_threads specifies the number of indexing workers that + /// should work at the same time. + /// # Errors + /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// # Panics + /// If the heap size per thread is too small, panics. + pub fn open(index: &Index, + num_threads: usize, + heap_size_in_bytes_per_thread: usize) + -> Result { - segment_update_sender: segment_update_sender, - segment_update_thread: segment_update_thread, - - workers_join_handle: Vec::new(), - num_threads: num_threads, + if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize { + panic!(format!("The heap size per thread needs to be at least {}.", + HEAP_SIZE_LIMIT)); + } - committed_docstamp: index.docstamp(), - uncommitted_docstamp: index.docstamp(), - worker_id: 0, - }; - try!(index_writer.start_workers()); - Ok(index_writer) - } + let directory_lock = try!(DirectoryLock::lock(index.directory().box_clone())); - fn start_workers(&mut self,) -> Result<()> { - for _ in 0 .. self.num_threads { - try!(self.add_indexing_worker()); - } - Ok(()) - } - - /// Merges a given list of segments - pub fn merge(&mut self, segments: &[Segment]) -> Result<()> { - - if segments.len() < 2 { - // no segments or one segment? nothing to do. - return Ok(()); - } + let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = + chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); + + let segment_updater = SegmentUpdater::new(index.clone()); + let segment_update_sender = segment_updater.update_channel(); + let segment_update_thread = segment_updater.start(); + + let mut index_writer = IndexWriter { + _directory_lock: directory_lock, + + heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, + index: index.clone(), + + document_receiver: document_receiver, + document_sender: document_sender, + + segment_update_sender: segment_update_sender, + segment_update_thread: segment_update_thread, + + workers_join_handle: Vec::new(), + num_threads: num_threads, + + committed_docstamp: index.docstamp(), + uncommitted_docstamp: index.docstamp(), + worker_id: 0, + }; + try!(index_writer.start_workers()); + Ok(index_writer) + } + + fn start_workers(&mut self) -> Result<()> { + for _ in 0..self.num_threads { + try!(self.add_indexing_worker()); + } + Ok(()) + } + + /// Merges a given list of segments + pub fn merge(&mut self, segments: &[Segment]) -> Result<()> { + + if segments.len() < 2 { + // no segments or one segment? nothing to do. + return Ok(()); + } - let segment_manager = get_segment_manager(&self.index); + let segment_manager = get_segment_manager(&self.index); - { - // let's check that all these segments are in the same - // committed/uncommited state. - let first_commit_state = segment_manager.is_committed(segments[0].id()); - - for segment in segments { - let commit_state = segment_manager.is_committed(segment.id()); - if commit_state == CommitState::Missing { - return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index", segments[0].id()))); - } - if commit_state != first_commit_state { - return Err(Error::InvalidArgument(String::from("You may not merge segments that are heterogenously in committed and uncommited."))); - } - } - } + { + // let's check that all these segments are in the same + // committed/uncommited state. + let first_commit_state = segment_manager.is_committed(segments[0].id()); - let schema = self.index.schema(); + for segment in segments { + let commit_state = segment_manager.is_committed(segment.id()); + if commit_state == CommitState::Missing { + return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index", + segments[0].id()))); + } + if commit_state != first_commit_state { + return Err(Error::InvalidArgument(String::from("You may not merge segments \ + that are heterogenously in \ + committed and uncommited."))); + } + } + } - // An IndexMerger is like a "view" of our merged segments. - let merger = try!(IndexMerger::open(schema, segments)); - let mut merged_segment = self.index.new_segment(); + let schema = self.index.schema(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment)); - let num_docs = try!(merger.write(segment_serializer)); - let merged_segment_ids: Vec = segments.iter().map(|segment| segment.id()).collect(); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - }; + // An IndexMerger is like a "view" of our merged segments. + let merger = try!(IndexMerger::open(schema, segments)); + let mut merged_segment = self.index.new_segment(); - segment_manager.end_merge(&merged_segment_ids, &segment_meta); - try!(self.index.load_searchers()); - Ok(()) - } + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment)); + let num_docs = try!(merger.write(segment_serializer)); + let merged_segment_ids: Vec = + segments.iter().map(|segment| segment.id()).collect(); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + }; - /// Closes the current document channel send. - /// and replace all the channels by new ones. - /// - /// The current workers will keep on indexing - /// the pending document and stop - /// when no documents are remaining. - /// - /// Returns the former segment_ready channel. - fn recreate_document_channel(&mut self,) -> DocumentReceiver { - let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - swap(&mut self.document_sender, &mut document_sender); - swap(&mut self.document_receiver, &mut document_receiver); - document_receiver - } + segment_manager.end_merge(&merged_segment_ids, &segment_meta); + try!(self.index.load_searchers()); + Ok(()) + } - /// Rollback to the last commit - /// - /// This cancels all of the update that - /// happened before after the last commit. - /// After calling rollback, the index is in the same - /// state as it was after the last commit. - /// - /// The docstamp at the last commit is returned. - pub fn rollback(&mut self,) -> Result { + /// Closes the current document channel send. + /// and replace all the channels by new ones. + /// + /// The current workers will keep on indexing + /// the pending document and stop + /// when no documents are remaining. + /// + /// Returns the former segment_ready channel. + fn recreate_document_channel(&mut self) -> DocumentReceiver { + let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) = + chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); + swap(&mut self.document_sender, &mut document_sender); + swap(&mut self.document_receiver, &mut document_receiver); + document_receiver + } - self.segment_update_sender.send(SegmentUpdate::CancelGeneration); - - // we cannot drop segment ready receiver yet - // as it would block the workers. - let document_receiver = self.recreate_document_channel(); - - // Drains the document receiver pipeline : - // Workers don't need to index the pending documents. - for _ in document_receiver {}; - - let mut former_workers_join_handle = Vec::new(); - swap(&mut former_workers_join_handle, &mut self.workers_join_handle); - - // wait for all the worker to finish their work - // (it should be fast since we consumed all pending documents) - for worker_handle in former_workers_join_handle { - // we stop one worker at a time ... - try!(try!( - worker_handle - .join() - .map_err(|e| Error::ErrorInThread(format!("{:?}", e))) - )); - // ... and recreate a new one right away - // to work on the next generation. - try!(self.add_indexing_worker()); - } + /// Rollback to the last commit + /// + /// This cancels all of the update that + /// happened before after the last commit. + /// After calling rollback, the index is in the same + /// state as it was after the last commit. + /// + /// The docstamp at the last commit is returned. + pub fn rollback(&mut self) -> Result { - // All of our indexing workers for the rollbacked generation have - // been terminated. - // Our document receiver pipe was drained. - // No new document have been added in the meanwhile because `IndexWriter` - // is not shared by different threads. - // - // We can now open a new generation and reaccept segments - // from now on. - self.segment_update_sender.send(SegmentUpdate::NewGeneration); + self.segment_update_sender.send(SegmentUpdate::CancelGeneration); - let rollbacked_segments = get_segment_manager(&self.index).rollback(); - for segment_id in rollbacked_segments { + // we cannot drop segment ready receiver yet + // as it would block the workers. + let document_receiver = self.recreate_document_channel(); - // TODO all delete must happen after saving - // meta.json - self.index.delete_segment(segment_id); - } - try!(self.on_change()); + // Drains the document receiver pipeline : + // Workers don't need to index the pending documents. + for _ in document_receiver {} - // reset the docstamp - self.uncommitted_docstamp = self.committed_docstamp; - Ok(self.committed_docstamp) - } + let mut former_workers_join_handle = Vec::new(); + swap(&mut former_workers_join_handle, + &mut self.workers_join_handle); + + // wait for all the worker to finish their work + // (it should be fast since we consumed all pending documents) + for worker_handle in former_workers_join_handle { + // we stop one worker at a time ... + try!(try!(worker_handle.join() + .map_err(|e| Error::ErrorInThread(format!("{:?}", e))))); + // ... and recreate a new one right away + // to work on the next generation. + try!(self.add_indexing_worker()); + } + + // All of our indexing workers for the rollbacked generation have + // been terminated. + // Our document receiver pipe was drained. + // No new document have been added in the meanwhile because `IndexWriter` + // is not shared by different threads. + // + // We can now open a new generation and reaccept segments + // from now on. + self.segment_update_sender.send(SegmentUpdate::NewGeneration); + + let rollbacked_segments = get_segment_manager(&self.index).rollback(); + for segment_id in rollbacked_segments { + + // TODO all delete must happen after saving + // meta.json + self.index.delete_segment(segment_id); + } + try!(self.on_change()); + + // reset the docstamp + self.uncommitted_docstamp = self.committed_docstamp; + Ok(self.committed_docstamp) + } - /// Commits all of the pending changes - /// - /// A call to commit blocks. - /// After it returns, all of the document that - /// were added since the last commit are published - /// and persisted. - /// - /// In case of a crash or an hardware failure (as - /// long as the hard disk is spared), it will be possible - /// to resume indexing from this point. - /// - /// Commit returns the `docstamp` of the last document - /// that made it in the commit. - /// - pub fn commit(&mut self,) -> Result { - - // this will drop the current document channel - // and recreate a new one channels. - self.recreate_document_channel(); - - // Docstamp of the last document in this commit. - self.committed_docstamp = self.uncommitted_docstamp; + /// Commits all of the pending changes + /// + /// A call to commit blocks. + /// After it returns, all of the document that + /// were added since the last commit are published + /// and persisted. + /// + /// In case of a crash or an hardware failure (as + /// long as the hard disk is spared), it will be possible + /// to resume indexing from this point. + /// + /// Commit returns the `docstamp` of the last document + /// that made it in the commit. + /// + pub fn commit(&mut self) -> Result { - let mut former_workers_join_handle = Vec::new(); - swap(&mut former_workers_join_handle, &mut self.workers_join_handle); - - for worker_handle in former_workers_join_handle { - let indexing_worker_result = try!(worker_handle - .join() - .map_err(|e| Error::ErrorInThread(format!("{:?}", e))) - ); - try!(indexing_worker_result); - // add a new worker for the next generation. - try!(self.add_indexing_worker()); - } - // here, because we join all of the worker threads, - // all of the segment update for this commit have been - // sent. - // - // No document belonging to the next generation have been - // pushed too, because add_document can only happen - // on this thread. - - // This will move uncommitted segments to the state of - // committed segments. - self.segment_update_sender.send(SegmentUpdate::Commit(self.committed_docstamp)); - - // wait for the segment update thread to have processed the info - let segment_manager = get_segment_manager(&self.index); - while segment_manager.docstamp() != self.committed_docstamp { - println!("wait"); - thread::sleep(Duration::from_millis(100)); - } - - // super::super::core::index::commit(&mut self.index, commit_docstamp); - try!(self.on_change()); - Ok(self.committed_docstamp) - } - + // this will drop the current document channel + // and recreate a new one channels. + self.recreate_document_channel(); - /// Adds a document. - /// - /// If the indexing pipeline is full, this call may block. - /// - /// The docstamp is an increasing `u64` that can - /// be used by the client to align commits with its own - /// document queue. - /// - /// Currently it represents the number of documents that - /// have been added since the creation of the index. - pub fn add_document(&mut self, doc: Document) -> io::Result { - self.document_sender.send(doc); - self.uncommitted_docstamp += 1; - Ok(self.uncommitted_docstamp) - } - + // Docstamp of the last document in this commit. + self.committed_docstamp = self.uncommitted_docstamp; + let mut former_workers_join_handle = Vec::new(); + swap(&mut former_workers_join_handle, + &mut self.workers_join_handle); + + for worker_handle in former_workers_join_handle { + let indexing_worker_result = try!(worker_handle.join() + .map_err(|e| Error::ErrorInThread(format!("{:?}", e)))); + try!(indexing_worker_result); + // add a new worker for the next generation. + try!(self.add_indexing_worker()); + } + // here, because we join all of the worker threads, + // all of the segment update for this commit have been + // sent. + // + // No document belonging to the next generation have been + // pushed too, because add_document can only happen + // on this thread. + + // This will move uncommitted segments to the state of + // committed segments. + self.segment_update_sender.send(SegmentUpdate::Commit(self.committed_docstamp)); + + // wait for the segment update thread to have processed the info + let segment_manager = get_segment_manager(&self.index); + while segment_manager.docstamp() != self.committed_docstamp { + println!("wait"); + thread::sleep(Duration::from_millis(100)); + } + + try!(self.on_change()); + Ok(self.committed_docstamp) + } + + + /// Adds a document. + /// + /// If the indexing pipeline is full, this call may block. + /// + /// The docstamp is an increasing `u64` that can + /// be used by the client to align commits with its own + /// document queue. + /// + /// Currently it represents the number of documents that + /// have been added since the creation of the index. + pub fn add_document(&mut self, doc: Document) -> io::Result { + self.document_sender.send(doc); + self.uncommitted_docstamp += 1; + Ok(self.uncommitted_docstamp) + } } #[cfg(test)] mod tests { - use schema::{self, Document}; - use Index; - use Term; - #[test] - fn test_commit_and_rollback() { - let mut schema_builder = schema::SchemaBuilder::default(); - let text_field = schema_builder.add_text_field("text", schema::TEXT); - let index = Index::create_in_ram(schema_builder.build()); - let num_docs_containing = |s: &str| { - let searcher = index.searcher(); - let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) - }; - - { - // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc).unwrap(); - index_writer.commit().expect("commit failed"); - } - { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc).unwrap(); - // here we have a partial segment. - } - { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc).unwrap(); - // here we have a partial segment. - } - assert_eq!(index_writer.rollback().unwrap(), 1u64); - assert_eq!(num_docs_containing("a"), 1); - - { - let mut doc = Document::default(); - doc.add_text(text_field, "b"); - index_writer.add_document(doc).unwrap(); - } - { - let mut doc = Document::default(); - doc.add_text(text_field, "c"); - index_writer.add_document(doc).unwrap(); - } - assert_eq!(index_writer.commit().unwrap(), 3u64); - assert_eq!(num_docs_containing("a"), 1); - assert_eq!(num_docs_containing("b"), 1); - assert_eq!(num_docs_containing("c"), 1); - } - index.searcher(); - } + use schema::{self, Document}; + use Index; + use Term; + use Error; + + #[test] + fn test_lockfile_stops_duplicates() { + let schema_builder = schema::SchemaBuilder::default(); + let index = Index::create_in_ram(schema_builder.build()); + let _index_writer = index.writer(40_000_000).unwrap(); + match index.writer(40_000_000) { + Err(Error::FileAlreadyExists(_)) => {} + _ => panic!("Expected FileAlreadyExists error"), + } + } + + #[test] + fn test_lockfile_released_on_drop() { + let schema_builder = schema::SchemaBuilder::default(); + let index = Index::create_in_ram(schema_builder.build()); + { + let _index_writer = index.writer(40_000_000).unwrap(); + // the lock should be released when the + // index_writer leaves the scope. + } + let _index_writer_two = index.writer(40_000_000).unwrap(); + } + + #[test] + fn test_commit_and_rollback() { + let mut schema_builder = schema::SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let num_docs_containing = |s: &str| { + let searcher = index.searcher(); + let term_a = Term::from_field_text(text_field, s); + searcher.doc_freq(&term_a) + }; - #[test] - fn test_with_merges() { - let mut schema_builder = schema::SchemaBuilder::default(); - let text_field = schema_builder.add_text_field("text", schema::TEXT); - let index = Index::create_in_ram(schema_builder.build()); - let num_docs_containing = |s: &str| { - let searcher = index.searcher(); - let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) - }; - { - // writing the segment - let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap(); - // create 10 segments with 100 tiny docs - for _doc in 0..100 { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc).unwrap(); - } - index_writer.commit().expect("commit failed"); - for _doc in 0..100 { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc).unwrap(); - } - // this should create 8 segments and trigger a merge. - index_writer.commit().expect("commit failed"); - index_writer.wait_merging_threads().expect("waiting merging thread failed"); - assert_eq!(num_docs_containing("a"), 200); - assert_eq!(index.searchable_segments().len(), 1); - } - } + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(3, 40_000_000).unwrap(); + { + let mut doc = Document::default(); + doc.add_text(text_field, "a"); + index_writer.add_document(doc).unwrap(); + } + assert_eq!(index_writer.rollback().unwrap(), 0u64); + assert_eq!(num_docs_containing("a"), 0); -} \ No newline at end of file + { + let mut doc = Document::default(); + doc.add_text(text_field, "b"); + index_writer.add_document(doc).unwrap(); + } + { + let mut doc = Document::default(); + doc.add_text(text_field, "c"); + index_writer.add_document(doc).unwrap(); + } + assert_eq!(index_writer.commit().unwrap(), 2u64); + + assert_eq!(num_docs_containing("a"), 0); + assert_eq!(num_docs_containing("b"), 1); + assert_eq!(num_docs_containing("c"), 1); + } + index.searcher(); + } + + #[test] + fn test_with_merges() { + let mut schema_builder = schema::SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let num_docs_containing = |s: &str| { + let searcher = index.searcher(); + let term_a = Term::from_field_text(text_field, s); + searcher.doc_freq(&term_a) + }; + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap(); + // create 10 segments with 100 tiny docs + for _doc in 0..100 { + let mut doc = Document::default(); + doc.add_text(text_field, "a"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().expect("commit failed"); + for _doc in 0..100 { + let mut doc = Document::default(); + doc.add_text(text_field, "a"); + index_writer.add_document(doc).unwrap(); + } + // this should create 8 segments and trigger a merge. + index_writer.commit().expect("commit failed"); + index_writer.wait_merging_threads().expect("waiting merging thread failed"); + assert_eq!(num_docs_containing("a"), 200); + assert_eq!(index.searchable_segments().len(), 1); + } + } + + +} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 276973e1b..e3f9e4a24 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -8,6 +8,7 @@ mod segment_register; mod segment_writer; mod segment_manager; mod segment_updater; +mod directory_lock; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/lib.rs b/src/lib.rs index ebcaa2d5b..9970f8b48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,16 @@ mod macros { macro_rules! get( ($e:expr) => (match $e { Some(e) => e, None => return None }) ); + + macro_rules! doc( + ($($field:ident => $value:expr),*) => {{ + let mut document = Document::default(); + $( + document.add(FieldValue::new($field, $value.into())); + )* + document + }}; + ); } mod core; @@ -97,7 +107,7 @@ pub use postings::SegmentPostingsOption; /// u32 identifying a document within a segment. -/// Document gets their doc id assigned incrementally, +/// Documents have their doc id assigned incrementally, /// as they are added in the segment. pub type DocId = u32; @@ -400,4 +410,20 @@ mod tests { } index.searcher(); } + + #[test] + fn test_doc_macro() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let other_text_field = schema_builder.add_text_field("text2", TEXT); + let document = doc!(text_field => "tantivy", text_field => "some other value", other_text_field => "short"); + assert_eq!(document.len(), 3); + let values = document.get_all(text_field); + assert_eq!(values.len(), 2); + assert_eq!(values[0].text(), "tantivy"); + assert_eq!(values[1].text(), "some other value"); + let values = document.get_all(other_text_field); + assert_eq!(values.len(), 1); + assert_eq!(values[0].text(), "short"); + } } diff --git a/src/postings/docset.rs b/src/postings/docset.rs index e7fb59315..db40db619 100644 --- a/src/postings/docset.rs +++ b/src/postings/docset.rs @@ -4,7 +4,7 @@ use std::borrow::BorrowMut; use std::cmp::Ordering; -/// Expressed the outcome of a call to `DocSet`'s `.skip_next(...)`. +/// Expresses the outcome of a call to `DocSet`'s `.skip_next(...)`. #[derive(PartialEq, Eq, Debug)] pub enum SkipResult { /// target was in the docset @@ -24,8 +24,8 @@ pub trait DocSet { /// element. fn advance(&mut self,) -> bool; - /// After skipping position, the iterator in such a way `.doc()` - /// will return a value greater or equal to target. + /// After skipping, position the iterator in such a way that `.doc()` + /// will return a value greater than or equal to target. /// /// SkipResult expresses whether the `target value` was reached, overstepped, /// or if the `DocSet` was entirely consumed without finding any value @@ -97,4 +97,4 @@ impl<'a, TDocSet: DocSet> DocSet for &'a mut TDocSet { } } - \ No newline at end of file + diff --git a/src/postings/postings.rs b/src/postings/postings.rs index e3323ae19..071068c95 100644 --- a/src/postings/postings.rs +++ b/src/postings/postings.rs @@ -12,8 +12,8 @@ use common::HasLen; /// as well as the list of term positions. /// /// Its main implementation is `SegmentPostings`, -/// but some other implementation mocking SegmentPostings exists, -/// in order to help merging segment or for testing. +/// but other implementations mocking SegmentPostings exist, +/// in order to help when merging segments or for testing. pub trait Postings: DocSet { /// Returns the term frequency fn term_freq(&self,) -> u32; diff --git a/src/query/query_parser.rs b/src/query/query_parser.rs index f15196a06..89876825d 100644 --- a/src/query/query_parser.rs +++ b/src/query/query_parser.rs @@ -29,7 +29,7 @@ pub enum ParsingError { /// Tantivy's Query parser /// -/// The language covered by the current is extremely simple. +/// The language covered by the current parser is extremely simple. /// /// * simple terms: "e.g.: `Barack Obama` are simply analyzed using /// tantivy's `StandardTokenizer`, hence becoming `["barack", "obama"]`. @@ -44,7 +44,7 @@ pub enum ParsingError { /// /// This behavior is slower, but is not a bad idea if the user is sorting /// by relevance : The user typically just scans through the first few -/// documents in order of decreasing relevance and will stop when the document +/// documents in order of decreasing relevance and will stop when the documents /// are not relevant anymore. /// Making it possible to make this behavior customizable is tracked in /// [issue #27](https://github.com/fulmicoton/tantivy/issues/27). @@ -135,9 +135,9 @@ impl QueryParser { /// Parse a query /// /// Note that `parse_query` returns an error if the input - /// not a valid query. + /// is not a valid query. /// - /// There is currently no lenient mode for the query parse + /// There is currently no lenient mode for the query parser /// which makes it a bad choice for a public/broad user search engine. /// /// Implementing a lenient mode for this query parser is tracked diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 2abde20ba..5be6bc512 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -4,7 +4,7 @@ # Schema definition Tantivy has a very strict schema. -The schema defines information about the fields your index contains, that is for each field : +The schema defines information about the fields your index contains, that is, for each field : * the field name (may only contain letters `[a-zA-Z]`, number `[0-9]`, and `_`) * the type of the field (currently only `text` and `u32` are supported) @@ -37,20 +37,20 @@ let schema = schema_builder.build(); We can split the problem of generating a search result page into two phases : -* identifying the list of 10 or so document to be displayed (Conceptually `query -> doc_ids[]`) +* identifying the list of 10 or so documents to be displayed (Conceptually `query -> doc_ids[]`) * for each of these documents, retrieving the information required to generate the serp page. (`doc_ids[] -> Document[]`) -In the first phase, the hability to search for documents by the given field, is determined by the [`TextIndexingOptions`](enum.TextIndexingOptions.html) of our +In the first phase, the ability to search for documents by the given field is determined by the [`TextIndexingOptions`](enum.TextIndexingOptions.html) of our [`TextOptions`](struct.TextOptions.html). -The effect of each possible settings is described more in detail [`TextIndexingOptions`](enum.TextIndexingOptions.html). +The effect of each possible setting is described more in detail [`TextIndexingOptions`](enum.TextIndexingOptions.html). On the other hand setting the field as stored or not determines whether the field should be returned when [`searcher.doc(doc_address)`](../struct.Searcher.html#method.doc) is called. ### Shortcuts -For convenience, a few special value of `TextOptions` for your convenience. +For convenience, a few special values of `TextOptions`. They can be composed using the `|` operator. The example can be rewritten : @@ -82,7 +82,7 @@ Just like for Text fields (see above), setting the field as stored defines whether the field will be returned when [`searcher.doc(doc_address)`](../struct.Searcher.html#method.doc) is called, and setting the field as indexed means that we will be able perform queries such as `num_stars:10`. -Note that contrary to text fields, u32 can only be indexed in one way for the moment. +Note that unlike text fields, u32 can only be indexed in one way for the moment. This may change when we will start supporting range queries. The `fast` option on the other hand is specific to u32 fields, and is only relevant diff --git a/src/schema/schema.rs b/src/schema/schema.rs index f41d5572f..8d6e3bedd 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -15,7 +15,7 @@ use std::fmt; /// Tantivy has a very strict schema. -/// You need to specify in advance, whether a field is indexed or not, +/// You need to specify in advance whether a field is indexed or not, /// stored or not, and RAM-based or not. /// /// This is done by creating a schema object, and @@ -483,4 +483,4 @@ mod tests { } } } -} \ No newline at end of file +} diff --git a/src/schema/value.rs b/src/schema/value.rs index 36f21bc86..084b8d373 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -59,6 +59,11 @@ impl From for Value { } } +impl<'a> From<&'a str> for Value { + fn from(s: &'a str) -> Value { + Value::Str(s.to_string()) + } +} const TEXT_CODE: u8 = 0; const U32_CODE: u8 = 1; @@ -95,4 +100,4 @@ impl BinarySerializable for Value { } } } -} \ No newline at end of file +}