From af7ea1422a887177b2febf37e5fbdb61e9d499c1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 17 Jul 2019 13:20:02 +0900 Subject: [PATCH] using smallvec for operation batches (#599) --- Cargo.toml | 1 + src/indexer/index_writer.rs | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 746cc8312..fbe329bfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ fail = "0.2" scoped-pool = "1.0" murmurhash32 = "0.2" chrono = "0.4" +smallvec = "0.6" [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 99eccc5aa..fee073123 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -26,6 +26,8 @@ use crate::Result; use bit_set::BitSet; use crossbeam::channel; use futures::{Canceled, Future}; +use smallvec::smallvec; +use smallvec::SmallVec; use std::mem; use std::ops::Range; use std::sync::Arc; @@ -44,8 +46,15 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES; // reaches `PIPELINE_MAX_SIZE_IN_DOCS` const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -type OperationSender = channel::Sender>; -type OperationReceiver = channel::Receiver>; +// Group of operations. +// Most of the time, users will send operation one-by-one, but it can be useful to +// send them as a small block to ensure that +// - all docs in the operation will happen on the same segment and continuous docids. +// - all operations in the group are committed at the same time, making the group +// atomic. +type OperationGroup = SmallVec<[AddOperation; 4]>; +type OperationSender = channel::Sender; +type OperationReceiver = channel::Receiver; /// `IndexWriter` is the user entry-point to add document to an index. /// @@ -236,7 +245,7 @@ pub fn advance_deletes( fn index_documents( memory_budget: usize, segment: &Segment, - grouped_document_iterator: &mut dyn Iterator>, + grouped_document_iterator: &mut dyn Iterator, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor, ) -> Result { @@ -667,7 +676,7 @@ impl IndexWriter { pub fn add_document(&self, document: Document) -> Opstamp { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; - let send_result = self.operation_sender.send(vec![add_operation]); + let send_result = self.operation_sender.send(smallvec![add_operation]); if let Err(e) = send_result { panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e); } @@ -714,7 +723,7 @@ impl IndexWriter { } let (batch_opstamp, stamps) = self.get_batch_opstamps(count); - let mut adds: Vec = Vec::new(); + let mut adds = OperationGroup::default(); for (user_op, opstamp) in user_operations.into_iter().zip(stamps) { match user_op {