From e67e5ebd4631773743971439327e5033f288abcf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 7 Apr 2021 23:35:03 +0900 Subject: [PATCH] Minor syntax changes, and passing a tantivy Directory as argument Closes #1005. --- src/indexer/segment_updater.rs | 335 ++++++++++++++++----------------- 1 file changed, 160 insertions(+), 175 deletions(-) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index fcee2cdac..7df03c220 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -141,37 +141,54 @@ fn merge( Ok(SegmentEntry::new(segment_meta, delete_cursor, None)) } -/// Merges a list of segments from different indices. +/// Advanced: Merges a list of segments from different indices in a new index. /// /// Returns `TantivyError` if the the indices list is empty or their /// schemas don't match. -pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Result { - // validate schema - let schemas: Vec = indices.iter().map(|index| index.schema()).collect(); - let is_not_valid = match schemas.first() { - Some(first) => schemas.iter().skip(1).any(|next| next != first), - None => true, - }; - - if is_not_valid { +/// +/// `output_directory`: is assumed to be empty. +/// +/// # Warning +/// This function does NOT check or take the `IndexWriter` is running. It is not +/// meant to work if you have an IndexWriter running for the origin indices, or +/// the destination Index. +#[doc(hidden)] +pub fn merge_segments( + indices: &[Index], + output_directory: Dir, +) -> crate::Result { + if indices.is_empty() { + // If there are no indices to merge, there is no need to do anything. return Err(crate::TantivyError::InvalidArgument( - "Attempt to merge empty or different schema indices".to_string(), + "No indices given to marge".to_string(), )); } - let mut merged_index = Index::create_in_dir(output_directory, schemas[0].clone())?; - let merged_segment = merged_index.new_segment(); + let target_schema = indices[0].schema(); - let mut segments: Vec = vec![]; + // let's check that all of the indices have the same schema + if indices + .iter() + .skip(1) + .any(|index| &index.schema() != &target_schema) + { + return Err(crate::TantivyError::InvalidArgument( + "Attempt to merge different schema indices".to_string(), + )); + } + + let mut segments: Vec = Vec::new(); for index in indices { segments.extend(index.searchable_segments()?); } - let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?; - let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?; - - let num_docs = merger.write(segment_serializer)?; + let mut merged_index = Index::create(output_directory, target_schema.clone())?; + let merged_segment = merged_index.new_segment(); let merged_segment_id = merged_segment.id(); + let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?; + let segment_serializer = SegmentSerializer::for_segment(merged_segment)?; + let num_docs = merger.write(segment_serializer)?; + let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs); let stats = format!( @@ -188,7 +205,7 @@ pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Re let index_meta = IndexMeta { segments: vec![segment_meta], - schema: merged_index.schema(), + schema: target_schema, opstamp: 0u64, payload: Some(stats), }; @@ -537,7 +554,7 @@ impl SegmentUpdater { if delete_operation.opstamp < committed_opstamp { let index = &segment_updater.index; let segment = index.segment(after_merge_segment_entry.meta().clone()); - if let Err(e) = advance_deletes( + if let Err(advance_deletes_err) = advance_deletes( segment, &mut after_merge_segment_entry, committed_opstamp, @@ -545,7 +562,7 @@ impl SegmentUpdater { error!( "Merge of {:?} was cancelled (advancing deletes failed): {:?}", merge_operation.segment_ids(), - e + advance_deletes_err ); if cfg!(test) { panic!("Merge failed."); @@ -553,7 +570,7 @@ impl SegmentUpdater { // ... cancel merge // `merge_operations` are tracked. As it is dropped, the // the segment_ids will be available again for merge. - return Err(e); + return Err(advance_deletes_err); } } } @@ -599,163 +616,130 @@ impl SegmentUpdater { #[cfg(test)] mod tests { use super::merge_segments; + use crate::directory::RAMDirectory; use crate::indexer::merge_policy::tests::MergeWheneverPossible; use crate::schema::*; use crate::Index; - use std::path::PathBuf; #[test] - fn test_delete_during_merge() { + fn test_delete_during_merge() -> crate::Result<()> { let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field("text", TEXT); - let schema = schema_builder.build(); - - let index = Index::create_in_ram(schema); + let index = Index::create_in_ram(schema_builder.build()); // writing the segment - let mut index_writer = index.writer_for_tests().unwrap(); + let mut index_writer = index.writer_for_tests()?; index_writer.set_merge_policy(Box::new(MergeWheneverPossible)); - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"a")); - index_writer.add_document(doc!(text_field=>"b")); - } - assert!(index_writer.commit().is_ok()); + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"a")); + index_writer.add_document(doc!(text_field=>"b")); } + index_writer.commit()?; - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"c")); - index_writer.add_document(doc!(text_field=>"d")); - } - assert!(index_writer.commit().is_ok()); + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"c")); + index_writer.add_document(doc!(text_field=>"d")); } + index_writer.commit()?; - { - index_writer.add_document(doc!(text_field=>"e")); - index_writer.add_document(doc!(text_field=>"f")); - assert!(index_writer.commit().is_ok()); - } + index_writer.add_document(doc!(text_field=>"e")); + index_writer.add_document(doc!(text_field=>"f")); + index_writer.commit()?; - { - let term = Term::from_field_text(text_field, "a"); - index_writer.delete_term(term); - assert!(index_writer.commit().is_ok()); - } - let reader = index.reader().unwrap(); + let term = Term::from_field_text(text_field, "a"); + index_writer.delete_term(term); + index_writer.commit()?; + + let reader = index.reader()?; assert_eq!(reader.searcher().num_docs(), 302); - { - index_writer - .wait_merging_threads() - .expect("waiting for merging threads"); - } + index_writer.wait_merging_threads()?; - reader.reload().unwrap(); + reader.reload()?; assert_eq!(reader.searcher().segment_readers().len(), 1); assert_eq!(reader.searcher().num_docs(), 302); + Ok(()) } #[test] - fn delete_all_docs() { + fn delete_all_docs() -> crate::Result<()> { let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field("text", TEXT); - let schema = schema_builder.build(); - - let index = Index::create_in_ram(schema); + let index = Index::create_in_ram(schema_builder.build()); // writing the segment - let mut index_writer = index.writer_for_tests().unwrap(); + let mut index_writer = index.writer_for_tests()?; - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"a")); - index_writer.add_document(doc!(text_field=>"b")); - } - assert!(index_writer.commit().is_ok()); + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"a")); + index_writer.add_document(doc!(text_field=>"b")); + } + index_writer.commit()?; + + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"c")); + index_writer.add_document(doc!(text_field=>"d")); + } + index_writer.commit()?; + + index_writer.add_document(doc!(text_field=>"e")); + index_writer.add_document(doc!(text_field=>"f")); + index_writer.commit()?; + + let seg_ids = index.searchable_segment_ids()?; + // docs exist, should have at least 1 segment + assert!(seg_ids.len() > 0); + + let term_vals = vec!["a", "b", "c", "d", "e", "f"]; + for term_val in term_vals { + let term = Term::from_field_text(text_field, term_val); + index_writer.delete_term(term); + index_writer.commit()?; } - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"c")); - index_writer.add_document(doc!(text_field=>"d")); - } - assert!(index_writer.commit().is_ok()); - } + index_writer.wait_merging_threads()?; - { - index_writer.add_document(doc!(text_field=>"e")); - index_writer.add_document(doc!(text_field=>"f")); - assert!(index_writer.commit().is_ok()); - } - - { - let seg_ids = index - .searchable_segment_ids() - .expect("Searchable segments failed."); - // docs exist, should have at least 1 segment - assert!(seg_ids.len() > 0); - } - - { - let term_vals = vec!["a", "b", "c", "d", "e", "f"]; - for term_val in term_vals { - let term = Term::from_field_text(text_field, term_val); - index_writer.delete_term(term); - assert!(index_writer.commit().is_ok()); - } - } - - { - index_writer - .wait_merging_threads() - .expect("waiting for merging threads"); - } - - let reader = index.reader().unwrap(); + let reader = index.reader()?; assert_eq!(reader.searcher().num_docs(), 0); - let seg_ids = index - .searchable_segment_ids() - .expect("Searchable segments failed."); + let seg_ids = index.searchable_segment_ids()?; assert!(seg_ids.is_empty()); - reader.reload().unwrap(); + reader.reload()?; assert_eq!(reader.searcher().num_docs(), 0); // empty segments should be erased - assert!(index.searchable_segment_metas().unwrap().is_empty()); + assert!(index.searchable_segment_metas()?.is_empty()); assert!(reader.searcher().segment_readers().is_empty()); + + Ok(()) } #[test] - fn test_remove_all_segments() { + fn test_remove_all_segments() -> crate::Result<()> { let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field("text", TEXT); - let schema = schema_builder.build(); - - let index = Index::create_in_ram(schema); + let index = Index::create_in_ram(schema_builder.build()); // writing the segment - let mut index_writer = index.writer_for_tests().unwrap(); - - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"a")); - index_writer.add_document(doc!(text_field=>"b")); - } - assert!(index_writer.commit().is_ok()); + let mut index_writer = index.writer_for_tests()?; + for _ in 0..100 { + index_writer.add_document(doc!(text_field=>"a")); + index_writer.add_document(doc!(text_field=>"b")); } + index_writer.commit()?; + index_writer.segment_updater().remove_all_segments(); let seg_vec = index_writer .segment_updater() .segment_manager .segment_entries(); assert!(seg_vec.is_empty()); + Ok(()) } #[test] - fn test_merge_segements() { + fn test_merge_segments() -> crate::Result<()> { let mut indices = vec![]; let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field("text", TEXT); @@ -764,67 +748,68 @@ mod tests { for _ in 0..3 { let index = Index::create_in_ram(schema.clone()); - // writing the segment - let mut index_writer = index.writer_for_tests().unwrap(); - { - for _ in 0..100 { - index_writer.add_document(doc!(text_field=>"fizz")); - index_writer.add_document(doc!(text_field=>"buzz")); - } - assert!(index_writer.commit().is_ok()); - - for _ in 0..1000 { - index_writer.add_document(doc!(text_field=>"foo")); - index_writer.add_document(doc!(text_field=>"bar")); - } - assert!(index_writer.commit().is_ok()); - } - indices.push(index); - } - - assert_eq!(indices.len(), 3); - let out_dir = tempfile::tempdir().unwrap(); - let index = merge_segments(&indices, PathBuf::from(out_dir.path())).unwrap(); - - assert_eq!(index.schema(), schema); - - let segments = index.searchable_segments().unwrap(); - assert_eq!(segments.len(), 1); - - let segement_metas = segments[0].meta(); - assert_eq!(segement_metas.num_deleted_docs(), 0); - assert_eq!(segement_metas.num_docs(), 6600); - } - - #[test] - fn test_merge_mismatched_schema() { - let mut schemas = vec![]; - let mut schema_builder = Schema::builder(); - let text_field = schema_builder.add_text_field("text", TEXT); - schemas.push(schema_builder.build()); - let mut schema_builder = Schema::builder(); - let text_field = schema_builder.add_text_field("body", TEXT); - schemas.push(schema_builder.build()); - - let mut indices = vec![]; - for schema in schemas { - let index = Index::create_in_ram(schema.clone()); - let mut index_writer = index.writer_for_tests().unwrap(); + // writing two segments + let mut index_writer = index.writer_for_tests()?; for _ in 0..100 { index_writer.add_document(doc!(text_field=>"fizz")); index_writer.add_document(doc!(text_field=>"buzz")); } - assert!(index_writer.commit().is_ok()); + index_writer.commit()?; + + for _ in 0..1000 { + index_writer.add_document(doc!(text_field=>"foo")); + index_writer.add_document(doc!(text_field=>"bar")); + } + index_writer.commit()?; indices.push(index); } + assert_eq!(indices.len(), 3); + let output_directory = RAMDirectory::default(); + let index = merge_segments(&indices, output_directory)?; + assert_eq!(index.schema(), schema); + + let segments = index.searchable_segments()?; + assert_eq!(segments.len(), 1); + + let segment_metas = segments[0].meta(); + assert_eq!(segment_metas.num_deleted_docs(), 0); + assert_eq!(segment_metas.num_docs(), 6600); + Ok(()) + } + + #[test] + fn test_merge_empty_indices_array() { + let merge_result = merge_segments(&[], RAMDirectory::default()); + assert!(merge_result.is_err()); + } + + #[test] + fn test_merge_mismatched_schema() -> crate::Result<()> { + let first_index = { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(text_field=>"some text")); + index_writer.commit()?; + index + }; + + let second_index = { + let mut schema_builder = Schema::builder(); + let body_field = schema_builder.add_text_field("body", TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests()?; + index_writer.add_document(doc!(body_field=>"some body")); + index_writer.commit()?; + index + }; + // mismatched schema index list - let out_dir = tempfile::tempdir().unwrap(); - let result = merge_segments(&indices, PathBuf::from(out_dir.path())); + let result = merge_segments(&[first_index, second_index], RAMDirectory::default()); assert!(result.is_err()); - // empty index list - let result = merge_segments(&vec![], PathBuf::from(out_dir.path())); - assert!(result.is_err()); + Ok(()) } }