From a6b5f4f5b5969b68d6ae6bd4e1486e7e97dc38ed Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 11 Mar 2020 10:37:25 +0900 Subject: [PATCH] in ram segments --- src/common/mod.rs | 4 +- src/core/index.rs | 14 ++- src/core/segment.rs | 17 ++- src/directory/mod.rs | 2 +- src/directory/spilling_writer.rs | 52 +++------ src/indexer/index_writer.rs | 95 +++++++++++---- src/indexer/merger.rs | 2 +- src/indexer/mod.rs | 1 + src/indexer/prepared_commit.rs | 21 ++-- src/indexer/segment_entry.rs | 15 +-- src/indexer/segment_manager.rs | 12 +- src/indexer/segment_register.rs | 8 +- src/indexer/segment_updater.rs | 51 +++++++-- src/indexer/segment_writer.rs | 32 +++--- src/postings/mod.rs | 18 +-- src/reader/index_writer_reader.rs | 84 ++++++++++++++ src/reader/meta_file_reader.rs | 177 ++++++++++++++++++++++++++++ src/reader/mod.rs | 184 +++++------------------------- src/store/writer.rs | 1 - 19 files changed, 498 insertions(+), 292 deletions(-) create mode 100644 src/reader/index_writer_reader.rs create mode 100644 src/reader/meta_file_reader.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index ec6a55678..a53a0ab75 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -220,10 +220,12 @@ pub(crate) mod test { assert_eq!(minmax(vals.into_iter()), None); } + #[test] fn test_minmax_one() { - assert_eq!(minmax(vec![1].into_iter()), None); + assert_eq!(minmax(vec![1].into_iter()), Some((1, 1))); } + #[test] fn test_minmax_two() { assert_eq!(minmax(vec![1, 2].into_iter()), Some((1, 2))); assert_eq!(minmax(vec![2, 1].into_iter()), Some((1, 2))); diff --git a/src/core/index.rs b/src/core/index.rs index 8b7222a9a..dfd9429fb 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -469,7 +469,7 @@ mod tests { .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - test_index_on_commit_reload_policy_aux(field, &index, &reader); + test_index_on_commit_reload_policy_aux(field, index.clone(), &index, &reader); } #[cfg(feature = "mmap")] @@ -493,7 +493,7 @@ mod tests { .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - test_index_on_commit_reload_policy_aux(field, &index, &reader); + test_index_on_commit_reload_policy_aux(field, index.clone(), &index, &reader); } #[test] @@ -535,12 +535,16 @@ mod tests { .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - test_index_on_commit_reload_policy_aux(field, &write_index, &reader); + test_index_on_commit_reload_policy_aux(field, read_index, &write_index, &reader); } } - fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) { - let mut reader_index = reader.index(); + fn test_index_on_commit_reload_policy_aux( + field: Field, + mut reader_index: Index, + index: &Index, + reader: &IndexReader, + ) { let (sender, receiver) = crossbeam::channel::unbounded(); let _watch_handle = reader_index.directory_mut().watch(Box::new(move || { let _ = sender.send(()); diff --git a/src/core/segment.rs b/src/core/segment.rs index aef2e7c06..46a2805db 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -18,6 +18,16 @@ pub(crate) enum SegmentDirectory { Volatile(RAMDirectory), } +impl fmt::Debug for SegmentDirectory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + SegmentDirectory::Volatile(_) => write!(f, "volatile")?, + SegmentDirectory::Persisted(dir) => write!(f, "Persisted({:?})", dir)?, + } + Ok(()) + } +} + impl SegmentDirectory { pub fn new_volatile() -> SegmentDirectory { SegmentDirectory::Volatile(RAMDirectory::default()) @@ -60,7 +70,12 @@ pub struct Segment { impl fmt::Debug for Segment { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Segment({:?})", self.id().uuid_string()) + write!( + f, + "Segment(id={:?}, directory={:?})", + self.id().uuid_string(), + self.directory + ) } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 83f55a789..ee85c908b 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -23,8 +23,8 @@ pub use self::directory::DirectoryLock; pub use self::directory::{Directory, DirectoryClone}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; -pub(crate) use self::spilling_writer::{SpillingWriter, SpillingResult}; pub use self::read_only_source::ReadOnlySource; +pub(crate) use self::spilling_writer::{SpillingResult, SpillingWriter}; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; use std::io::{self, BufWriter, Write}; use std::path::PathBuf; diff --git a/src/directory/spilling_writer.rs b/src/directory/spilling_writer.rs index ee9552120..ae619134b 100644 --- a/src/directory/spilling_writer.rs +++ b/src/directory/spilling_writer.rs @@ -1,4 +1,4 @@ -use crate::directory::{WritePtr, TerminatingWrite}; +use crate::directory::{TerminatingWrite, WritePtr}; use std::io::{self, Write}; pub enum SpillingState { @@ -11,7 +11,6 @@ pub enum SpillingState { } impl SpillingState { - fn new( limit: usize, write_factory: Box io::Result>, @@ -57,37 +56,16 @@ impl SpillingWriter { write_factory: Box io::Result>, ) -> SpillingWriter { let state = SpillingState::new(limit, write_factory); - SpillingWriter { - state: Some(state) - } - } - - pub fn flush_and_finalize(self) -> io::Result<()> { - match self.state.expect("State cannot be none") { - SpillingState::Buffer { - buffer, - write_factory, - .. - } => { - let mut wrt = write_factory()?; - wrt.write_all(&buffer[..])?; - wrt.terminate()?; - } - SpillingState::Spilled(wrt) => { - wrt.terminate()?; - } - } - Ok(()) + SpillingWriter { state: Some(state) } } pub fn finalize(self) -> io::Result { match self.state.expect("state cannot be None") { - SpillingState::Spilled(mut wrt) => { + SpillingState::Spilled(wrt) => { wrt.terminate()?; Ok(SpillingResult::Spilled) } SpillingState::Buffer { buffer, .. } => Ok(SpillingResult::Buffer(buffer)), - } } } @@ -111,20 +89,18 @@ impl io::Write for SpillingWriter { } fn write_all(&mut self, payload: &[u8]) -> io::Result<()> { - let state_opt: Option> = self.state - .take() - .map(|mut state| { - state = state.reserve(payload.len())?; - match &mut state { - SpillingState::Buffer { buffer, .. } => { - buffer.extend_from_slice(payload); - } - SpillingState::Spilled(wrt) => { - wrt.write_all(payload)?; - } + let state_opt: Option> = self.state.take().map(|mut state| { + state = state.reserve(payload.len())?; + match &mut state { + SpillingState::Buffer { buffer, .. } => { + buffer.extend_from_slice(payload); } - Ok(state) - }); + SpillingState::Spilled(wrt) => { + wrt.write_all(payload)?; + } + } + Ok(state) + }); self.state = state_opt.transpose()?; Ok(()) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index ebc56756f..39c2435d8 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,5 +1,4 @@ use super::operation::{AddOperation, UserOperation}; -use crate::indexer::segment_manager::SegmentRegisters; use super::segment_updater::SegmentUpdater; use super::PreparedCommit; use crate::common::BitSet; @@ -9,23 +8,26 @@ use crate::core::SegmentComponent; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; -use crate::directory::TerminatingWrite; use crate::directory::{DirectoryLock, GarbageCollectionResult}; +use crate::directory::{TerminatingWrite, WatchCallbackList}; use crate::docset::DocSet; use crate::error::TantivyError; use crate::fastfield::write_delete_bitset; use crate::indexer::delete_queue::{DeleteCursor, DeleteQueue}; use crate::indexer::doc_opstamp_mapping::DocToOpstampMapping; use crate::indexer::operation::DeleteOperation; +use crate::indexer::segment_manager::SegmentRegisters; +use crate::indexer::segment_register::SegmentRegister; use crate::indexer::stamper::Stamper; use crate::indexer::MergePolicy; use crate::indexer::SegmentEntry; use crate::indexer::SegmentWriter; +use crate::reader::NRTReader; use crate::schema::Document; use crate::schema::IndexRecordOption; use crate::schema::Term; use crate::tokenizer::TokenizerManager; -use crate::Opstamp; +use crate::{IndexReader, Opstamp}; use crossbeam::channel; use futures::executor::block_on; use futures::future::Future; @@ -36,7 +38,6 @@ use std::ops::Range; use std::sync::{Arc, RwLock}; use std::thread; use std::thread::JoinHandle; -use crate::indexer::segment_register::SegmentRegister; // Size of the margin for the heap. A segment is closed when the remaining memory // in the heap goes below MARGIN_IN_BYTES. @@ -92,6 +93,8 @@ pub struct IndexWriter { stamper: Stamper, committed_opstamp: Opstamp, + + on_commit: WatchCallbackList, } fn compute_deleted_bitset( @@ -217,7 +220,7 @@ fn index_documents( let schema = segment.schema(); let mut segment_writer = - SegmentWriter::for_segment(memory_budget, segment.clone(), &schema, tokenizers)?; + SegmentWriter::for_segment(memory_budget, segment, &schema, tokenizers)?; for document_group in grouped_document_iterator { for doc in document_group { segment_writer.add_document(doc, &schema)?; @@ -242,24 +245,14 @@ fn index_documents( // the worker thread. assert!(max_doc > 0); - let doc_opstamps: Vec = segment_writer.finalize()?; - - let segment_with_max_doc = segment.with_max_doc(max_doc); + let (segment, doc_opstamps): (Segment, Vec) = segment_writer.finalize()?; let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); - let delete_bitset_opt = apply_deletes( - &segment_with_max_doc, - &mut delete_cursor, - &doc_opstamps, - last_docstamp, - )?; + let delete_bitset_opt = + apply_deletes(&segment, &mut delete_cursor, &doc_opstamps, last_docstamp)?; - let segment_entry = SegmentEntry::new( - segment_with_max_doc, - delete_cursor, - delete_bitset_opt, - ); + let segment_entry = SegmentEntry::new(segment, delete_cursor, delete_bitset_opt); block_on(segment_updater.schedule_add_segment(segment_entry))?; Ok(true) } @@ -368,7 +361,8 @@ impl IndexWriter { stamper, worker_id: 0, - segment_registers + segment_registers, + on_commit: Default::default(), }; index_writer.start_workers()?; Ok(index_writer) @@ -683,6 +677,24 @@ impl IndexWriter { self.prepare_commit(false)?.commit() } + pub fn soft_commit(&mut self) -> crate::Result { + self.prepare_commit(true)?.commit() + } + + pub(crate) fn trigger_commit(&self) -> impl Future { + self.on_commit.broadcast() + } + + pub fn reader(&self, num_searchers: usize) -> crate::Result { + let nrt_reader = NRTReader::create( + num_searchers, + self.index.clone(), + self.segment_registers.clone(), + &self.on_commit, + )?; + Ok(IndexReader::NRT(nrt_reader)) + } + pub(crate) fn segment_updater(&self) -> &SegmentUpdater { &self.segment_updater } @@ -1078,7 +1090,8 @@ mod tests { index_writer.add_document(doc!(text_field => "a")); } { - let mut prepared_commit = index_writer.prepare_commit(false).expect("commit failed"); + let mut prepared_commit = + index_writer.prepare_commit(false).expect("commit failed"); prepared_commit.set_payload("first commit"); prepared_commit.commit().expect("commit failed"); } @@ -1111,7 +1124,8 @@ mod tests { index_writer.add_document(doc!(text_field => "a")); } { - let mut prepared_commit = index_writer.prepare_commit(false).expect("commit failed"); + let mut prepared_commit = + index_writer.prepare_commit(false).expect("commit failed"); prepared_commit.set_payload("first commit"); prepared_commit.abort().expect("commit failed"); } @@ -1289,4 +1303,41 @@ mod tests { let commit = index_writer.commit(); assert!(commit.is_ok()); } + + #[test] + fn test_index_writer_reader() { + let mut schema_builder = schema::Schema::builder(); + let idfield = schema_builder.add_text_field("id", STRING); + schema_builder.add_text_field("optfield", STRING); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.commit().is_ok()); + let reader = index_writer.reader(2).unwrap(); + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 1u64); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.commit().is_ok()); + assert_eq!(reader.searcher().num_docs(), 2u64); + assert_eq!(searcher.num_docs(), 1u64); + } + + #[test] + fn test_index_writer_reader_soft_commit() { + let mut schema_builder = schema::Schema::builder(); + let idfield = schema_builder.add_text_field("id", STRING); + schema_builder.add_text_field("optfield", STRING); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.soft_commit().is_ok()); + let nrt_reader = index_writer.reader(2).unwrap(); + let normal_reader = index.reader_builder().try_into().unwrap(); + assert_eq!(nrt_reader.searcher().num_docs(), 1u64); + assert_eq!(normal_reader.searcher().num_docs(), 0u64); + assert!(index_writer.commit().is_ok()); + assert!(normal_reader.reload().is_ok()); + assert_eq!(nrt_reader.searcher().num_docs(), 1u64); + assert_eq!(normal_reader.searcher().num_docs(), 1u64); + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index c819fb4c5..9d3c33f78 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,8 +1,8 @@ use crate::common::MAX_DOC_LIMIT; -use crate::directory::TerminatingWrite; use crate::core::Segment; use crate::core::SegmentReader; use crate::core::SerializableSegment; +use crate::directory::TerminatingWrite; use crate::directory::WritePtr; use crate::docset::DocSet; use crate::fastfield::BytesFastFieldReader; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index fc7be1e27..0dbb55120 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -23,6 +23,7 @@ pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy}; pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; +pub(crate) use self::segment_manager::SegmentRegisters; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index a2dca69f1..9b594d49b 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -7,16 +7,20 @@ pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, opstamp: Opstamp, - soft_commit: bool + soft_commit: bool, } impl<'a> PreparedCommit<'a> { - pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp, soft_commit: bool) -> PreparedCommit<'_> { + pub(crate) fn new( + index_writer: &'a mut IndexWriter, + opstamp: Opstamp, + soft_commit: bool, + ) -> PreparedCommit<'_> { PreparedCommit { index_writer, payload: None, opstamp, - soft_commit + soft_commit, } } @@ -34,11 +38,12 @@ impl<'a> PreparedCommit<'a> { pub fn commit(self) -> crate::Result { info!("committing {}", self.opstamp); - let _ = block_on( - self.index_writer - .segment_updater() - .schedule_commit(self.opstamp, self.payload, self.soft_commit), - ); + block_on(self.index_writer.segment_updater().schedule_commit( + self.opstamp, + self.payload, + self.soft_commit, + ))?; + block_on(self.index_writer.trigger_commit()); Ok(self.opstamp) } } diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 05979c297..80a0d3108 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -1,10 +1,10 @@ use crate::common::BitSet; use crate::core::SegmentId; use crate::core::SegmentMeta; -use crate::indexer::delete_queue::DeleteCursor; -use std::fmt; -use crate::{Segment, Opstamp}; use crate::directory::ManagedDirectory; +use crate::indexer::delete_queue::DeleteCursor; +use crate::{Opstamp, Segment}; +use std::fmt; /// A segment entry describes the state of /// a given segment, at a given instant. @@ -46,14 +46,6 @@ impl SegmentEntry { Ok(()) } - /// Return a reference to the segment entry deleted bitset. - /// - /// `DocId` in this bitset are flagged as deleted. - pub fn delete_bitset(&self) -> Option<&BitSet> { - self.delete_bitset.as_ref() - } - - pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) { self.delete_cursor = delete_cursor; } @@ -96,7 +88,6 @@ impl SegmentEntry { } } - impl fmt::Debug for SegmentEntry { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { write!(formatter, "SegmentEntry({:?})", self.meta()) diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 6106d5757..0671b04eb 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,11 +3,11 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::error::TantivyError; use crate::indexer::SegmentEntry; +use crate::Segment; use std::collections::hash_set::HashSet; use std::fmt::{self, Debug, Formatter}; -use std::sync::{RwLock, Arc}; +use std::sync::{Arc, RwLock}; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; -use crate::Segment; #[derive(Default)] pub(crate) struct SegmentRegisters { @@ -22,8 +22,6 @@ pub(crate) enum SegmentsStatus { } impl SegmentRegisters { - - pub fn new(committed: SegmentRegister) -> SegmentRegisters { SegmentRegisters { uncommitted: Default::default(), @@ -87,12 +85,8 @@ pub fn get_mergeable_segments( } impl SegmentManager { - - pub(crate) fn new(registers: Arc>) -> SegmentManager { - SegmentManager { - registers - } + SegmentManager { registers } } /// Returns all of the segment entries (committed or uncommitted) diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index a3e554a02..beb37d2f9 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -1,13 +1,13 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; +use crate::directory::ManagedDirectory; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::segment_entry::SegmentEntry; +use crate::schema::Schema; +use crate::Segment; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::{self, Debug, Formatter}; -use crate::Segment; -use crate::directory::ManagedDirectory; -use crate::schema::Schema; /// The segment register keeps track /// of the list of segment, their size as well @@ -48,7 +48,7 @@ impl SegmentRegister { .map(|segment_entry| segment_entry.meta().clone()) .collect() } - + pub fn segments(&self) -> Vec { self.segment_states .values() diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 1e205ee73..67325b776 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -10,7 +10,7 @@ use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; use crate::indexer::merger::IndexMerger; -use crate::indexer::segment_manager::{SegmentsStatus, SegmentRegisters}; +use crate::indexer::segment_manager::{SegmentRegisters, SegmentsStatus}; use crate::indexer::stamper::Stamper; use crate::indexer::SegmentEntry; use crate::indexer::SegmentSerializer; @@ -116,14 +116,14 @@ fn merge( // First we apply all of the delet to the merged segment, up to the target opstamp. for segment_entry in &mut segment_entries { - advance_deletes( segment_entry, target_opstamp)?; + advance_deletes(segment_entry, target_opstamp)?; } let delete_cursor = segment_entries[0].delete_cursor().clone(); let segments: Vec = segment_entries .iter() - .map(|segment_entry| index.segment(segment_entry.meta().clone())) + .map(|segment_entry| segment_entry.segment().clone()) .collect(); // An IndexMerger is like a "view" of our merged segments. @@ -137,7 +137,11 @@ fn merge( let max_doc = merger.write(segment_serializer)?; - Ok(SegmentEntry::new(merged_segment.with_max_doc(max_doc), delete_cursor, None)) + Ok(SegmentEntry::new( + merged_segment.with_max_doc(max_doc), + delete_cursor, + None, + )) } pub(crate) struct InnerSegmentUpdater { @@ -163,7 +167,7 @@ impl SegmentUpdater { pub fn create( segment_registers: Arc>, index: Index, - stamper: Stamper + stamper: Stamper, ) -> crate::Result { let segment_manager = SegmentManager::new(segment_registers); let pool = ThreadPoolBuilder::new() @@ -340,7 +344,9 @@ impl SegmentUpdater { } } segment_updater.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload)?; + if !soft_commit { + segment_updater.save_metas(opstamp, payload)?; + } let _ = garbage_collect_files(segment_updater.clone()).await; segment_updater.consider_merge_options().await; Ok(()) @@ -482,10 +488,9 @@ impl SegmentUpdater { if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater.load_metas().opstamp; if delete_operation.opstamp < committed_opstamp { - if let Err(e) = advance_deletes( - &mut after_merge_segment_entry, - committed_opstamp, - ) { + if let Err(e) = + advance_deletes(&mut after_merge_segment_entry, committed_opstamp) + { error!( "Merge of {:?} was cancelled (advancing deletes failed): {:?}", merge_operation.segment_ids(), @@ -545,7 +550,8 @@ mod tests { use crate::indexer::merge_policy::tests::MergeWheneverPossible; use crate::schema::*; - use crate::Index; + use crate::{Index, SegmentId}; + use futures::executor::block_on; #[test] fn test_delete_during_merge() { @@ -696,4 +702,27 @@ mod tests { .segment_entries(); assert!(seg_vec.is_empty()); } + + #[test] + fn test_merge_over_soft_commit() { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + // writing the segment + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + index_writer.add_document(doc!(text_field=>"a")); + assert!(index_writer.soft_commit().is_ok()); + index_writer.add_document(doc!(text_field=>"a")); + assert!(index_writer.soft_commit().is_ok()); + + let reader = index_writer.reader(1).unwrap(); + let segment_ids: Vec = reader + .searcher() + .segment_readers() + .iter() + .map(|reader| reader.segment_id()) + .collect(); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + } } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index eee550841..70d117b88 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,6 +1,7 @@ use super::operation::AddOperation; use crate::core::Segment; use crate::core::SerializableSegment; +use crate::directory::{SpillingResult, SpillingWriter, TerminatingWrite}; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::FieldNormsWriter; use crate::indexer::segment_serializer::SegmentSerializer; @@ -13,14 +14,13 @@ use crate::schema::Value; use crate::schema::{Field, FieldEntry}; use crate::store::StoreWriter; use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; -use crate::tokenizer::{TokenizerManager, FacetTokenizer, TextAnalyzer}; +use crate::tokenizer::{FacetTokenizer, TextAnalyzer, TokenizerManager}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::Opstamp; use crate::{DocId, SegmentComponent}; use std::io; -use std::str; -use crate::directory::{SpillingWriter, SpillingResult, TerminatingWrite}; use std::io::Write; +use std::str; /// Computes the initial size of the hash table. /// @@ -87,11 +87,14 @@ impl SegmentWriter { ) .collect(); let mut segment_clone = segment.clone(); - let spilling_wrt = SpillingWriter::new(1_000, Box::new(move || { - segment_clone - .open_write(SegmentComponent::STORE) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - })); + let spilling_wrt = SpillingWriter::new( + 1_000, + Box::new(move || { + segment_clone + .open_write(SegmentComponent::STORE) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + }), + ); let store_writer = StoreWriter::new(spilling_wrt); Ok(SegmentWriter { max_doc: 0, @@ -109,7 +112,7 @@ impl SegmentWriter { /// /// Finalize consumes the `SegmentWriter`, so that it cannot /// be used afterwards. - pub fn finalize(mut self) -> crate::Result> { + pub fn finalize(mut self) -> crate::Result<(Segment, Vec)> { self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc); let spilling_wrt = self.store_writer.close()?; let mut segment: Segment; @@ -118,27 +121,22 @@ impl SegmentWriter { segment = self.segment.clone(); } SpillingResult::Buffer(buf) => { - let mut store_wrt = self.segment.open_write(SegmentComponent::STORE)?; - store_wrt.write_all(&buf[..])?; - store_wrt.terminate()?; - segment = self.segment.clone(); // TODO fix volatile branch - /* segment = self.segment.into_volatile(); let mut store_wrt = segment.open_write(SegmentComponent::STORE)?; store_wrt.write_all(&buf[..])?; store_wrt.terminate()?; - */ } } - let segment_serializer = SegmentSerializer::for_segment(&mut self.segment)?; + let segment_serializer = SegmentSerializer::for_segment(&mut segment)?; + segment = segment.with_max_doc(self.max_doc); write( &self.multifield_postings, &self.fast_field_writers, &self.fieldnorms_writer, segment_serializer, )?; - Ok(self.doc_opstamps) + Ok((segment, self.doc_opstamps)) } pub fn mem_usage(&self) -> usize { diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 1adbb77af..fa5cafa62 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -216,12 +216,15 @@ pub mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema.clone()); - let segment = index.new_segment(); - { - let mut segment_writer = - SegmentWriter::for_segment(3_000_000, segment.clone(), &schema, index.tokenizers()) - .unwrap(); + let segment = { + let mut segment_writer = SegmentWriter::for_segment( + 3_000_000, + index.new_segment(), + &schema, + index.tokenizers(), + ) + .unwrap(); { let mut doc = Document::default(); // checking that position works if the field has two values @@ -253,8 +256,9 @@ pub mod tests { }; segment_writer.add_document(op, &schema).unwrap(); } - segment_writer.finalize().unwrap(); - } + let (segment, _) = segment_writer.finalize().unwrap(); + segment + }; { let segment_reader = SegmentReader::open(&segment).unwrap(); { diff --git a/src/reader/index_writer_reader.rs b/src/reader/index_writer_reader.rs new file mode 100644 index 000000000..d44cf38c3 --- /dev/null +++ b/src/reader/index_writer_reader.rs @@ -0,0 +1,84 @@ +use crate::directory::{WatchCallbackList, WatchHandle}; +use crate::indexer::SegmentRegisters; +use crate::reader::pool::Pool; +use crate::{Index, LeasedItem, Searcher, Segment, SegmentReader}; +use std::iter::repeat_with; +use std::sync::{Arc, RwLock, Weak}; + +struct InnerNRTReader { + num_searchers: usize, + index: Index, + searcher_pool: Pool, + segment_registers: Arc>, +} + +impl InnerNRTReader { + fn load_segment_readers(&self) -> crate::Result> { + let segments: Vec = self + .segment_registers + .read() + .expect("lock should never be polluted. Please report.") + .committed_segment(); + segments + .iter() + .map(SegmentReader::open) + .collect::>>() + } + + pub fn reload(&self) -> crate::Result<()> { + let segment_readers: Vec = self.load_segment_readers()?; + let schema = self.index.schema(); + let searchers = repeat_with(|| { + Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()) + }) + .take(self.num_searchers) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + pub fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +#[derive(Clone)] +pub struct NRTReader { + inner: Arc, + watch_handle: WatchHandle, +} + +impl NRTReader { + pub fn reload(&self) -> crate::Result<()> { + self.inner.reload() + } + + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } + + pub(crate) fn create( + num_searchers: usize, + index: Index, + segment_registers: Arc>, + watch_callback_list: &WatchCallbackList, + ) -> crate::Result { + let inner_reader: Arc = Arc::new(InnerNRTReader { + num_searchers, + index, + searcher_pool: Pool::new(), + segment_registers, + }); + let inner_reader_weak: Weak = Arc::downgrade(&inner_reader); + let watch_handle = watch_callback_list.subscribe(Box::new(move || { + if let Some(nrt_reader_arc) = inner_reader_weak.upgrade() { + let _ = nrt_reader_arc.reload(); + } + })); + inner_reader.reload()?; + Ok(NRTReader { + inner: inner_reader, + watch_handle, + }) + } +} diff --git a/src/reader/meta_file_reader.rs b/src/reader/meta_file_reader.rs new file mode 100644 index 000000000..38e1c84f6 --- /dev/null +++ b/src/reader/meta_file_reader.rs @@ -0,0 +1,177 @@ +use super::pool::Pool; +use crate::core::Segment; +use crate::directory::Directory; +use crate::directory::WatchHandle; +use crate::directory::META_LOCK; +use crate::Searcher; +use crate::SegmentReader; +use crate::{Index, LeasedItem}; +use crate::{IndexReader, Result}; +use std::iter::repeat_with; +use std::sync::Arc; + +/// Defines when a new version of the index should be reloaded. +/// +/// Regardless of whether you search and index in the same process, tantivy does not necessarily +/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define +/// when you want your index to be reloaded. +#[derive(Clone, Copy)] +pub enum ReloadPolicy { + /// The index is entirely reloaded manually. + /// All updates of the index should be manual. + /// + /// No change is reflected automatically. You are required to call `.load_seacher()` manually. + Manual, + /// The index is reloaded within milliseconds after a new commit is available. + /// This is made possible by watching changes in the `meta.json` file. + OnCommit, // TODO add NEAR_REAL_TIME(target_ms) +} + +/// `IndexReader` builder +/// +/// It makes it possible to set the following values. +/// +/// - `num_searchers` (by default, the number of detected CPU threads): +/// +/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block +/// until the one of the searcher in-use gets released. +/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): +/// +/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. +#[derive(Clone)] +pub struct IndexReaderBuilder { + num_searchers: usize, + reload_policy: ReloadPolicy, + index: Index, +} + +impl IndexReaderBuilder { + pub(crate) fn new(index: Index) -> IndexReaderBuilder { + IndexReaderBuilder { + num_searchers: num_cpus::get(), + reload_policy: ReloadPolicy::Manual, + index, + } + } + + /// Sets the reload_policy. + /// + /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. + pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder { + self.reload_policy = reload_policy; + self + } + + /// Sets the number of `Searcher` in the searcher pool. + pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { + self.num_searchers = num_searchers; + self + } + + /// Building the reader is a non-trivial operation that requires + /// to open different segment readers. It may take hundreds of milliseconds + /// of time and it may return an error. + pub fn try_into(self) -> crate::Result { + let inner_reader = MetaFileIndexReaderInner { + index: self.index, + num_searchers: self.num_searchers, + searcher_pool: Pool::new(), + }; + inner_reader.reload()?; + let inner_reader_arc = Arc::new(inner_reader); + let watch_handle_opt: Option; + match self.reload_policy { + ReloadPolicy::Manual => { + // No need to set anything... + watch_handle_opt = None; + } + ReloadPolicy::OnCommit => { + let inner_reader_arc_clone = inner_reader_arc.clone(); + let callback = move || { + if let Err(err) = inner_reader_arc_clone.reload() { + error!( + "Error while loading searcher after commit was detected. {:?}", + err + ); + } + }; + let watch_handle = inner_reader_arc + .index + .directory() + .watch(Box::new(callback))?; + watch_handle_opt = Some(watch_handle); + } + } + Ok(IndexReader::from(MetaFileIndexReader { + inner: inner_reader_arc, + watch_handle_opt, + })) + } +} + +struct MetaFileIndexReaderInner { + num_searchers: usize, + searcher_pool: Pool, + index: Index, +} + +impl MetaFileIndexReaderInner { + fn load_segment_readers(&self) -> crate::Result> { + // We keep the lock until we have effectively finished opening the + // the `SegmentReader` because it prevents a diffferent process + // to garbage collect these file while we open them. + // + // Once opened, on linux & mac, the mmap will remain valid after + // the file has been deleted + // On windows, the file deletion will fail. + let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; + let searchable_segments = self.searchable_segments()?; + searchable_segments + .iter() + .map(SegmentReader::open) + .collect::>() + } + + fn reload(&self) -> crate::Result<()> { + let segment_readers: Vec = self.load_segment_readers()?; + let schema = self.index.schema(); + let searchers = repeat_with(|| { + Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()) + }) + .take(self.num_searchers) + .collect(); + self.searcher_pool.publish_new_generation(searchers); + Ok(()) + } + + /// Returns the list of segments that are searchable + fn searchable_segments(&self) -> crate::Result> { + self.index.searchable_segments() + } + + fn searcher(&self) -> LeasedItem { + self.searcher_pool.acquire() + } +} + +/// `IndexReader` is your entry point to read and search the index. +/// +/// It controls when a new version of the index should be loaded and lends +/// you instances of `Searcher` for the last loaded version. +/// +/// `Clone` does not clone the different pool of searcher. `IndexReader` +/// just wraps and `Arc`. +#[derive(Clone)] +pub struct MetaFileIndexReader { + inner: Arc, + watch_handle_opt: Option, +} + +impl MetaFileIndexReader { + pub fn reload(&self) -> crate::Result<()> { + self.inner.reload() + } + pub fn searcher(&self) -> LeasedItem { + self.inner.searcher() + } +} diff --git a/src/reader/mod.rs b/src/reader/mod.rs index b754fcde6..60aeb91d7 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,151 +1,14 @@ +mod index_writer_reader; +mod meta_file_reader; mod pool; +use self::meta_file_reader::MetaFileIndexReader; +pub use self::meta_file_reader::{IndexReaderBuilder, ReloadPolicy}; pub use self::pool::LeasedItem; -use self::pool::Pool; -use crate::core::Segment; -use crate::directory::Directory; -use crate::directory::WatchHandle; -use crate::directory::META_LOCK; -use crate::Index; + +pub(crate) use crate::reader::index_writer_reader::NRTReader; + use crate::Searcher; -use crate::SegmentReader; -use std::sync::Arc; - -/// Defines when a new version of the index should be reloaded. -/// -/// Regardless of whether you search and index in the same process, tantivy does not necessarily -/// reflects the change that are commited to your index. `ReloadPolicy` precisely helps you define -/// when you want your index to be reloaded. -#[derive(Clone, Copy)] -pub enum ReloadPolicy { - /// The index is entirely reloaded manually. - /// All updates of the index should be manual. - /// - /// No change is reflected automatically. You are required to call `.load_seacher()` manually. - Manual, - /// The index is reloaded within milliseconds after a new commit is available. - /// This is made possible by watching changes in the `meta.json` file. - OnCommit, // TODO add NEAR_REAL_TIME(target_ms) -} - -/// `IndexReader` builder -/// -/// It makes it possible to set the following values. -/// -/// - `num_searchers` (by default, the number of detected CPU threads): -/// -/// When `num_searchers` queries are requested at the same time, the `num_searchers` will block -/// until the one of the searcher in-use gets released. -/// - `reload_policy` (by default `ReloadPolicy::OnCommit`): -/// -/// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. -#[derive(Clone)] -pub struct IndexReaderBuilder { - num_searchers: usize, - reload_policy: ReloadPolicy, - index: Index, -} - -impl IndexReaderBuilder { - pub(crate) fn new(index: Index) -> IndexReaderBuilder { - IndexReaderBuilder { - num_searchers: num_cpus::get(), - reload_policy: ReloadPolicy::OnCommit, - index, - } - } - - /// Builds the reader. - /// - /// Building the reader is a non-trivial operation that requires - /// to open different segment readers. It may take hundreds of milliseconds - /// of time and it may return an error. - /// TODO(pmasurel) Use the `TryInto` trait once it is available in stable. - pub fn try_into(self) -> crate::Result { - let inner_reader = InnerIndexReader { - index: self.index, - num_searchers: self.num_searchers, - searcher_pool: Pool::new(), - }; - inner_reader.reload()?; - let inner_reader_arc = Arc::new(inner_reader); - let watch_handle_opt: Option; - match self.reload_policy { - ReloadPolicy::Manual => { - // No need to set anything... - watch_handle_opt = None; - } - ReloadPolicy::OnCommit => { - let inner_reader_arc_clone = inner_reader_arc.clone(); - let callback = move || { - if let Err(err) = inner_reader_arc_clone.reload() { - error!( - "Error while loading searcher after commit was detected. {:?}", - err - ); - } - }; - let watch_handle = inner_reader_arc - .index - .directory() - .watch(Box::new(callback))?; - watch_handle_opt = Some(watch_handle); - } - } - Ok(IndexReader { - inner: inner_reader_arc, - watch_handle_opt, - }) - } - - /// Sets the reload_policy. - /// - /// See [`ReloadPolicy`](./enum.ReloadPolicy.html) for more details. - pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder { - self.reload_policy = reload_policy; - self - } - - /// Sets the number of `Searcher` in the searcher pool. - pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder { - self.num_searchers = num_searchers; - self - } -} - -struct InnerIndexReader { - num_searchers: usize, - searcher_pool: Pool, - index: Index, -} - -impl InnerIndexReader { - fn reload(&self) -> crate::Result<()> { - let segment_readers: Vec = { - let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?; - let searchable_segments = self.searchable_segments()?; - searchable_segments - .iter() - .map(SegmentReader::open) - .collect::>()? - }; - let schema = self.index.schema(); - let searchers = (0..self.num_searchers) - .map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone())) - .collect(); - self.searcher_pool.publish_new_generation(searchers); - Ok(()) - } - - /// Returns the list of segments that are searchable - fn searchable_segments(&self) -> crate::Result> { - self.index.searchable_segments() - } - - fn searcher(&self) -> LeasedItem { - self.searcher_pool.acquire() - } -} /// `IndexReader` is your entry point to read and search the index. /// @@ -155,17 +18,12 @@ impl InnerIndexReader { /// `Clone` does not clone the different pool of searcher. `IndexReader` /// just wraps and `Arc`. #[derive(Clone)] -pub struct IndexReader { - inner: Arc, - watch_handle_opt: Option, +pub enum IndexReader { + FromMetaFile(MetaFileIndexReader), + NRT(NRTReader), } impl IndexReader { - #[cfg(test)] - pub(crate) fn index(&self) -> Index { - self.inner.index.clone() - } - /// Update searchers so that they reflect the state of the last /// `.commit()`. /// @@ -176,7 +34,10 @@ impl IndexReader { /// This automatic reload can take 10s of milliseconds to kick in however, and in unit tests /// it can be nice to deterministically force the reload of searchers. pub fn reload(&self) -> crate::Result<()> { - self.inner.reload() + match self { + IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.reload(), + IndexReader::NRT(nrt_reader) => nrt_reader.reload(), + } } /// Returns a searcher @@ -190,6 +51,21 @@ impl IndexReader { /// The same searcher must be used for a given query, as it ensures /// the use of a consistent segment set. pub fn searcher(&self) -> LeasedItem { - self.inner.searcher() + match self { + IndexReader::FromMetaFile(meta_file_reader) => meta_file_reader.searcher(), + IndexReader::NRT(nrt_reader) => nrt_reader.searcher(), + } + } +} + +impl From for IndexReader { + fn from(meta_file_reader: MetaFileIndexReader) -> Self { + IndexReader::FromMetaFile(meta_file_reader) + } +} + +impl From for IndexReader { + fn from(nrt_reader: NRTReader) -> Self { + IndexReader::NRT(nrt_reader) } } diff --git a/src/store/writer.rs b/src/store/writer.rs index 60306ce09..9fe35a738 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -112,6 +112,5 @@ impl StoreWriter { self.doc.serialize(&mut self.writer)?; let (wrt, _) = self.writer.finish()?; Ok(wrt) - } }