From e14701e9cd3f605f800efd1fccdf9f4558034201 Mon Sep 17 00:00:00 2001 From: Jason Goldberger Date: Wed, 13 Feb 2019 16:56:01 -0700 Subject: [PATCH] Add grouped operations (#493) * [WIP] added UserOperation enum, added IndexWriter.run, and added MultiStamp * removed MultiStamp in favor of std::ops::Range * changed IndexWriter::run to return u64, Stamper::stamps to return a Range, added tests, and added docs * changed delete_cursor skipping to use first operation's opstamp vice last. change index_writer test to use 1 thread * added test for order batch of operations * added a test comment --- src/indexer/index_writer.rs | 172 +++++++++++++++++++++++++++++++++--- src/indexer/operation.rs | 7 ++ src/indexer/stamper.rs | 13 +++ 3 files changed, 180 insertions(+), 12 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 2d74b46fe..8a43b0c3a 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,4 +1,4 @@ -use super::operation::AddOperation; +use super::operation::{AddOperation, UserOperation}; use super::segment_updater::SegmentUpdater; use super::PreparedCommit; use bit_set::BitSet; @@ -26,6 +26,7 @@ use schema::Document; use schema::IndexRecordOption; use schema::Term; use std::mem; +use std::ops::Range; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; @@ -43,8 +44,8 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; // reaches `PIPELINE_MAX_SIZE_IN_DOCS` const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -type DocumentSender = channel::Sender; -type DocumentReceiver = channel::Receiver; +type DocumentSender = channel::Sender>; +type DocumentReceiver = channel::Receiver>; /// Split the thread memory budget into /// - the heap size @@ -266,7 +267,7 @@ fn index_documents( memory_budget: usize, segment: &Segment, generation: usize, - document_iterator: &mut impl Iterator, + document_iterator: &mut Iterator>, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor, ) -> Result { @@ -274,11 +275,11 @@ fn index_documents( let segment_id = segment.id(); let table_size = initial_table_size(memory_budget); let mut segment_writer = SegmentWriter::for_segment(table_size, segment.clone(), &schema)?; - for doc in document_iterator { - segment_writer.add_document(doc, &schema)?; - + for documents in document_iterator { + for doc in documents { + segment_writer.add_document(doc, &schema)?; + } let mem_usage = segment_writer.mem_usage(); - if mem_usage >= memory_budget - MARGIN_IN_BYTES { info!( "Buffer limit reached, flushing segment with maxdoc={}.", @@ -409,8 +410,12 @@ impl IndexWriter { // this is a valid guarantee as the // peeked document now belongs to // our local iterator. - if let Some(operation) = document_iterator.peek() { - delete_cursor.skip_to(operation.opstamp); + if let Some(operations) = document_iterator.peek() { + if let Some(first) = operations.first() { + delete_cursor.skip_to(first.opstamp); + } else { + return Ok(()); + } } else { // No more documents. // Happens when there is a commit, or if the `IndexWriter` @@ -643,25 +648,168 @@ impl IndexWriter { pub fn add_document(&mut self, document: Document) -> u64 { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; - let send_result = self.document_sender.send(add_operation); + let send_result = self.document_sender.send(vec![add_operation]); if let Err(e) = send_result { panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); } opstamp } + + /// Gets a range of stamps from the stamper and "pops" the last stamp + /// from the range returning a tuple of the last optstamp and the popped + /// range. + /// + /// The total number of stamps generated by this method is `count + 1`; + /// each operation gets a stamp from the `stamps` iterator and `last_opstamp` + /// is for the batch itself. + fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range) { + let Range { start, end } = self.stamper.stamps(count + 1u64); + let last_opstamp = end - 1; + let stamps = Range { + start: start, + end: last_opstamp, + }; + (last_opstamp, stamps) + } + + /// Runs a group of document operations ensuring that the operations are + /// assigned contigous u64 opstamps and that add operations of the same + /// group are flushed into the same segment. + /// + /// If the indexing pipeline is full, this call may block. + /// + /// Each operation of the given `user_operations` will receive an in-order, + /// contiguous u64 opstamp. The entire batch itself is also given an + /// opstamp that is 1 greater than the last given operation. This + /// `batch_opstamp` is the return value of `run`. An empty group of + /// `user_operations`, an empty `Vec`, still receives + /// a valid opstamp even though no changes were _actually_ made to the index. + /// + /// Like adds and deletes (see `IndexWriter.add_document` and + /// `IndexWriter.delete_term`), the changes made by calling `run` will be + /// visible to readers only after calling `commit()`. + pub fn run(&mut self, user_operations: Vec) -> u64 { + let count = user_operations.len() as u64; + if count == 0 { + return self.stamper.stamp(); + } + let (batch_opstamp, stamps) = self.get_batch_opstamps(count); + + let mut adds: Vec = Vec::new(); + + for (user_op, opstamp) in user_operations.into_iter().zip(stamps) { + match user_op { + UserOperation::Delete(term) => { + let delete_operation = DeleteOperation { + opstamp: opstamp, + term: term, + }; + self.delete_queue.push(delete_operation); + } + UserOperation::Add(doc) => { + let add_operation = AddOperation { + opstamp: opstamp, + document: doc, + }; + adds.push(add_operation); + } + } + } + let send_result = self.document_sender.send(adds); + if let Err(e) = send_result { + panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); + }; + + batch_opstamp + } } #[cfg(test)] mod tests { + use super::super::operation::UserOperation; use super::initial_table_size; use directory::error::LockError; use error::*; use indexer::NoMergePolicy; - use schema::{self, Document}; + use schema::{self, Document, IndexRecordOption}; + use query::{TermQuery}; + use collector::TopDocs; use Index; use Term; + #[test] + fn test_operations_group() { + // an operations group with 2 items should cause 3 opstamps 0, 1, and 2. + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + let operations = vec![ + UserOperation::Add(doc!(text_field=>"a")), + UserOperation::Add(doc!(text_field=>"b")), + ]; + let batch_opstamp1 = index_writer.run(operations); + assert_eq!(batch_opstamp1, 2u64); + } + + #[test] + fn test_ordered_batched_operations() { + // * one delete for `doc!(field=>"a")` + // * one add for `doc!(field=>"a")` + // * one add for `doc!(field=>"b")` + // * one delete for `doc!(field=>"b")` + // after commit there is one doc with "a" and 0 doc with "b" + let mut schema_builder = schema::Schema::builder(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + let a_term = Term::from_field_text(text_field, "a"); + let b_term = Term::from_field_text(text_field, "b"); + let operations = vec![ + UserOperation::Delete(a_term), + UserOperation::Add(doc!(text_field=>"a")), + UserOperation::Add(doc!(text_field=>"b")), + UserOperation::Delete(b_term), + ]; + + index_writer.run(operations); + index_writer.commit().expect("failed to commit"); + index.load_searchers().expect("failed to load searchers"); + + let a_term = Term::from_field_text(text_field, "a"); + let b_term = Term::from_field_text(text_field, "b"); + + let a_query = TermQuery::new(a_term, IndexRecordOption::Basic); + let b_query = TermQuery::new(b_term, IndexRecordOption::Basic); + + let searcher = index.searcher(); + + let a_docs = searcher + .search(&a_query, &TopDocs::with_limit(1)) + .expect("search for a failed"); + + let b_docs = searcher + .search(&b_query, &TopDocs::with_limit(1)) + .expect("search for b failed"); + + assert_eq!(a_docs.len(), 1); + assert_eq!(b_docs.len(), 0); + } + + #[test] + fn test_empty_operations_group() { + let schema_builder = schema::Schema::builder(); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer(3_000_000).unwrap(); + let operations1 = vec![]; + let batch_opstamp1 = index_writer.run(operations1); + assert_eq!(batch_opstamp1, 0u64); + let operations2 = vec![]; + let batch_opstamp2 = index_writer.run(operations2); + assert_eq!(batch_opstamp2, 1u64); + } + #[test] fn test_lockfile_stops_duplicates() { let schema_builder = schema::Schema::builder(); diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index 260d510e6..fe57a4a3a 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -14,3 +14,10 @@ pub struct AddOperation { pub opstamp: u64, pub document: Document, } + +/// UserOperation is an enum type that encapsulates other operation types. +#[derive(Eq, PartialEq, Debug)] +pub enum UserOperation { + Add(Document), + Delete(Term), +} diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 38933e727..69b4c51ef 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -1,3 +1,4 @@ +use std::ops::Range; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -60,6 +61,16 @@ impl Stamper { pub fn stamp(&self) -> u64 { self.0.fetch_add(1u64, Ordering::SeqCst) as u64 } + + /// Given a desired count `n`, `stamps` returns an iterator that + /// will supply `n` number of u64 stamps. + pub fn stamps(&self, n: u64) -> Range { + let start = self.0.fetch_add(n, Ordering::SeqCst); + Range { + start: start, + end: start + n, + } + } } #[cfg(test)] @@ -78,5 +89,7 @@ mod test { assert_eq!(stamper.stamp(), 10u64); assert_eq!(stamper_clone.stamp(), 11u64); + assert_eq!(stamper.stamps(3u64), (12..15)); + assert_eq!(stamper.stamp(), 15u64); } }