From 444662485fee012eeebe1fe91b0fc9af7008d5c0 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 28 May 2019 10:26:00 +0900 Subject: [PATCH] =?UTF-8?q?Remove=20mut=20in=20add=5Fdocument=20and=20dele?= =?UTF-8?q?te=5Fterm.=20Made=20stamper=20ordering=20rel=E2=80=A6=20(#551)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove mut in add_document and delete_term. Made stamper ordering relaxed. * Made batch operations &mut self -> &self * Added example --- CHANGELOG.md | 10 +++- examples/multiple_producer.rs | 107 ++++++++++++++++++++++++++++++++++ src/directory/directory.rs | 4 +- src/indexer/index_writer.rs | 12 ++-- 4 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 examples/multiple_producer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd38226f..5fc65ed7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,14 +9,20 @@ Tantivy 0.10.0 Minor --------- - Small simplification of the code. -Calling .freq() or .doc() when .advance() has never +Calling .freq() or .doc() when .advance() has never been called on segment postings should panic from now on. - Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking. - Fast fields are now preloaded when the `SegmentReader` is created. +- `IndexMeta` is now public. (@hntd187) +- `IndexWriter` `add_document`, `delete_term`. `IndexWriter` is `Sync`, making it possible to use it with a ` +Arc>`. `add_document` and `delete_term` can +only require a read lock. (@pmasurel) +- Introducing `Opstamp` as an expressive type alias for `u64`. (@petr-tik) +- Stamper now relies on `AtomicU64` on all platforms (@petr-tik) ## How to update? -Your existing indexes are usable as is. Your may or may need some +Your existing indexes are usable as is, but you may need some trivial updates. ### Fast fields diff --git a/examples/multiple_producer.rs b/examples/multiple_producer.rs new file mode 100644 index 000000000..8a51d40d4 --- /dev/null +++ b/examples/multiple_producer.rs @@ -0,0 +1,107 @@ +// # Indexing from different threads. +// +// It is fairly common to have to index from different threads. +// Tantivy forbids to create more than one `IndexWriter` at a time. +// +// This `IndexWriter` itself has its own multithreaded layer, so managing your own +// indexing threads will not help. However, it can still be useful for some applications. +// +// For instance, if preparing documents to send to tantivy before indexing is the bottleneck of +// your application, it is reasonable to have multiple threads. +// +// Another very common reason to want to index from multiple threads, is implementing a webserver +// with CRUD capabilities. The server framework will most likely handle request from +// different threads. +// +// The recommended way to address both of these use case is to wrap your `IndexWriter` into a +// `Arc>`. +// +// While this is counterintuitive, adding and deleting documents do not require mutability +// over the `IndexWriter`, so several threads will be able to do this operation concurrently. +// +// The example below does not represent an actual real-life use case (who would spawn thread to +// index a single document?), but aims at demonstrating the mechanism that makes indexing +// from several threads possible. + +extern crate tempdir; + +// --- +// Importing tantivy... +#[macro_use] +extern crate tantivy; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; +use tantivy::schema::{Schema, STORED, TEXT}; +use tantivy::Opstamp; +use tantivy::{Index, IndexWriter}; + +fn main() -> tantivy::Result<()> { + // # Defining the schema + let mut schema_builder = Schema::builder(); + let title = schema_builder.add_text_field("title", TEXT | STORED); + let body = schema_builder.add_text_field("body", TEXT); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + let index_writer: Arc> = Arc::new(RwLock::new(index.writer(50_000_000)?)); + + // # First indexing thread. + let index_writer_clone_1 = index_writer.clone(); + thread::spawn(move || { + // we index 100 times the document... for the sake of the example. + for i in 0..100 { + let opstamp = { + // A read lock is sufficient here. + let index_writer_rlock = index_writer_clone_1.read().unwrap(); + index_writer_rlock.add_document( + doc!( + title => "Of Mice and Men", + body => "A few miles south of Soledad, the Salinas River drops in close to the hillside \ + bank and runs deep and green. The water is warm too, for it has slipped twinkling \ + over the yellow sands in the sunlight before reaching the narrow pool. On one \ + side of the river the golden foothill slopes curve up to the strong and rocky \ + Gabilan Mountains, but on the valley side the water is lined with trees—willows \ + fresh and green with every spring, carrying in their lower leaf junctures the \ + debris of the winter’s flooding; and sycamores with mottled, white, recumbent \ + limbs and branches that arch over the pool" + )) + }; + println!("add doc {} from thread 1 - opstamp {}", i, opstamp); + thread::sleep(Duration::from_millis(20)); + } + }); + + // # Second indexing thread. + let index_writer_clone_2 = index_writer.clone(); + // For convenience, tantivy also comes with a macro to + // reduce the boilerplate above. + thread::spawn(move || { + // we index 100 times the document... for the sake of the example. + for i in 0..100 { + // A read lock is sufficient here. + let opstamp = { + let index_writer_rlock = index_writer_clone_2.read().unwrap(); + index_writer_rlock.add_document(doc!( + title => "Manufacturing consent", + body => "Some great book description..." + )) + }; + println!("add doc {} from thread 2 - opstamp {}", i, opstamp); + thread::sleep(Duration::from_millis(10)); + } + }); + + // # In the main thread, we commit 10 times, once every 500ms. + for _ in 0..10 { + let opstamp: Opstamp = { + // Committing or rollbacking on the other hand requires write lock. This will block other threads. + let mut index_writer_wlock = index_writer.write().unwrap(); + index_writer_wlock.commit().unwrap() + }; + println!("committed with opstamp {}", opstamp); + thread::sleep(Duration::from_millis(500)); + } + + Ok(()) +} diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 943998e2d..cc3208f07 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -48,14 +48,14 @@ impl RetryPolicy { /// /// It is transparently associated to a lock file, that gets deleted /// on `Drop.` The lock is released automatically on `Drop`. -pub struct DirectoryLock(Box); +pub struct DirectoryLock(Box); struct DirectoryLockGuard { directory: Box, path: PathBuf, } -impl From> for DirectoryLock { +impl From> for DirectoryLock { fn from(underlying: Box) -> Self { DirectoryLock(underlying) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 398e61358..d5bb06479 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -618,7 +618,7 @@ impl IndexWriter { /// /// Like adds, the deletion itself will be visible /// only after calling `commit()`. - pub fn delete_term(&mut self, term: Term) -> Opstamp { + pub fn delete_term(&self, term: Term) -> Opstamp { let opstamp = self.stamper.stamp(); let delete_operation = DeleteOperation { opstamp, term }; self.delete_queue.push(delete_operation); @@ -646,7 +646,7 @@ impl IndexWriter { /// /// Currently it represents the number of documents that /// have been added since the creation of the index. - pub fn add_document(&mut self, document: Document) -> Opstamp { + pub fn add_document(&self, document: Document) -> Opstamp { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; let send_result = self.operation_sender.send(vec![add_operation]); @@ -663,7 +663,7 @@ impl IndexWriter { /// The total number of stamps generated by this method is `count + 1`; /// each operation gets a stamp from the `stamps` iterator and `last_opstamp` /// is for the batch itself. - fn get_batch_opstamps(&mut self, count: Opstamp) -> (Opstamp, Range) { + fn get_batch_opstamps(&self, count: Opstamp) -> (Opstamp, Range) { let Range { start, end } = self.stamper.stamps(count + 1u64); let last_opstamp = end - 1; let stamps = Range { @@ -689,7 +689,7 @@ impl IndexWriter { /// Like adds and deletes (see `IndexWriter.add_document` and /// `IndexWriter.delete_term`), the changes made by calling `run` will be /// visible to readers only after calling `commit()`. - pub fn run(&mut self, user_operations: Vec) -> Opstamp { + pub fn run(&self, user_operations: Vec) -> Opstamp { let count = user_operations.len() as u64; if count == 0 { return self.stamper.stamp(); @@ -740,7 +740,7 @@ mod tests { let mut schema_builder = schema::Schema::builder(); let text_field = schema_builder.add_text_field("text", schema::TEXT); let index = Index::create_in_ram(schema_builder.build()); - let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + let index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let operations = vec![ UserOperation::Add(doc!(text_field=>"a")), UserOperation::Add(doc!(text_field=>"b")), @@ -802,7 +802,7 @@ mod tests { fn test_empty_operations_group() { let schema_builder = schema::Schema::builder(); let index = Index::create_in_ram(schema_builder.build()); - let mut index_writer = index.writer(3_000_000).unwrap(); + let index_writer = index.writer(3_000_000).unwrap(); let operations1 = vec![]; let batch_opstamp1 = index_writer.run(operations1); assert_eq!(batch_opstamp1, 0u64);