From f82922b3544cf86bccfb6e026ee6da3a33db2a42 Mon Sep 17 00:00:00 2001 From: Evance Souamoro Date: Tue, 6 Apr 2021 11:46:17 +0000 Subject: [PATCH] added a scratched of implementation but still need to craft one detail and write test to validate --- src/indexer/segment_updater.rs | 104 ++++++++++++++++++++++++++++++++- src/schema/schema.rs | 4 +- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index d0cb240bc..5f62fde96 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -114,7 +114,7 @@ fn merge( // first we need to apply deletes to our segment. let merged_segment = index.new_segment(); - // First we apply all of the delet to the merged segment, up to the target opstamp. + // First we apply all of the delete to the merged segment, up to the target opstamp. for segment_entry in &mut segment_entries { let segment = index.segment(segment_entry.meta().clone()); advance_deletes(segment, segment_entry, target_opstamp)?; @@ -141,6 +141,69 @@ fn merge( Ok(SegmentEntry::new(segment_meta, delete_cursor, None)) } +/// Merges a list of segments from different indices. +/// It wont performe any merge if the indices schema don't match +pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Result<()> { + //validate schema ? + let schemas: Vec = indices.iter().map(|index| index.schema()).collect(); + let schema = is_same_schema(&schemas[..])?; + + let mut merged_index = Index::create_in_dir(output_directory, schema.clone())?; + let merged_segment = merged_index.new_segment(); + + let mut segments: Vec = vec![]; + for index in indices { + segments.extend( + index.searchable_segments()? + .iter() + .map(|segment| merged_index.segment(segment.meta().clone())) + ); + } + + let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?; + let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?; + + let num_docs = merger.write(segment_serializer)?; + let merged_segment_id = merged_segment.id(); + let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs); + + let mut segment_metas: Vec = segments + .iter() + .map(|segment| segment.meta().clone()).collect(); + segment_metas.push(segment_meta); + + let opstamp = 0u64; //? should be set + let index_meta = IndexMeta { + segments: segment_metas, + schema: merged_index.schema(), + opstamp, + payload: None, + }; + + //? need discussion && add tests + save_metas(&index_meta, merged_index.directory_mut())?; + + Ok(()) +} + +fn is_same_schema(schemas: &[Schema]) -> crate::Result { + let first = schemas + .first() + .ok_or(crate::TantivyError::InvalidArgument("Empty schema set provided".to_string()))?; + + let first_json = serde_json::to_string(&first)?; + + let is_not_same = schemas.iter().skip(1).any(|next| { + serde_json::to_string(next) + .map_or(true, |next_json| next_json != first_json) + }); + + match is_not_same { + true => Err(crate::TantivyError::InvalidArgument("Attempt to merge different schema indices".to_string())), + false => Ok(first.clone()) + } +} + pub(crate) struct InnerSegmentUpdater { // we keep a copy of the current active IndexMeta to // avoid loading the file everytime we need it in the @@ -544,6 +607,7 @@ mod tests { use crate::indexer::merge_policy::tests::MergeWheneverPossible; use crate::schema::*; use crate::Index; + use super::is_same_schema; #[test] fn test_delete_during_merge() { @@ -694,4 +758,42 @@ mod tests { .segment_entries(); assert!(seg_vec.is_empty()); } + + #[test] + fn test_is_same_schema() { + //test empty list + assert!(is_same_schema(&vec![]).is_err()); + + //test same schemas + let mut list = vec![]; + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("name", TEXT); + schema_builder.add_text_field("body", STORED | TEXT); + list.push(schema_builder.build()); + + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("name", TEXT); + schema_builder.add_text_field("body", STORED | TEXT); + let schema = schema_builder.build(); + list.push(schema.clone()); + + let result = is_same_schema(&list); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), schema); + + //test different schemas + let mut list = vec![]; + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("name", TEXT); + schema_builder.add_text_field("body", STORED | TEXT); + list.push(schema_builder.build()); + + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("name", TEXT); + schema_builder.add_text_field("body", STORED | TEXT); + schema_builder.add_f64_field("weight", FAST); + list.push(schema_builder.build()); + + assert!(is_same_schema(&list).is_err()); + } } diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 70263a8f5..46e93933c 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -192,7 +192,7 @@ impl SchemaBuilder { })) } } - +#[derive(Debug)] struct InnerSchema { fields: Vec, fields_map: HashMap, // transient @@ -226,7 +226,7 @@ impl Eq for InnerSchema {} /// let schema = schema_builder.build(); /// /// ``` -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct Schema(Arc); impl Schema {