added a scratched of implementation but still need to craft one detail and write test to validate

This commit is contained in:
Evance Souamoro
2021-04-06 11:46:17 +00:00
parent 86b30d9d7f
commit f82922b354
2 changed files with 105 additions and 3 deletions

View File

@@ -114,7 +114,7 @@ fn merge(
// first we need to apply deletes to our segment.
let merged_segment = index.new_segment();
// First we apply all of the delet to the merged segment, up to the target opstamp.
// First we apply all of the delete to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
@@ -141,6 +141,69 @@ fn merge(
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
}
/// Merges a list of segments from different indices.
/// It wont performe any merge if the indices schema don't match
pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Result<()> {
//validate schema ?
let schemas: Vec<Schema> = indices.iter().map(|index| index.schema()).collect();
let schema = is_same_schema(&schemas[..])?;
let mut merged_index = Index::create_in_dir(output_directory, schema.clone())?;
let merged_segment = merged_index.new_segment();
let mut segments: Vec<Segment> = vec![];
for index in indices {
segments.extend(
index.searchable_segments()?
.iter()
.map(|segment| merged_index.segment(segment.meta().clone()))
);
}
let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
let num_docs = merger.write(segment_serializer)?;
let merged_segment_id = merged_segment.id();
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
let mut segment_metas: Vec<SegmentMeta> = segments
.iter()
.map(|segment| segment.meta().clone()).collect();
segment_metas.push(segment_meta);
let opstamp = 0u64; //? should be set
let index_meta = IndexMeta {
segments: segment_metas,
schema: merged_index.schema(),
opstamp,
payload: None,
};
//? need discussion && add tests
save_metas(&index_meta, merged_index.directory_mut())?;
Ok(())
}
fn is_same_schema(schemas: &[Schema]) -> crate::Result<Schema> {
let first = schemas
.first()
.ok_or(crate::TantivyError::InvalidArgument("Empty schema set provided".to_string()))?;
let first_json = serde_json::to_string(&first)?;
let is_not_same = schemas.iter().skip(1).any(|next| {
serde_json::to_string(next)
.map_or(true, |next_json| next_json != first_json)
});
match is_not_same {
true => Err(crate::TantivyError::InvalidArgument("Attempt to merge different schema indices".to_string())),
false => Ok(first.clone())
}
}
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file everytime we need it in the
@@ -544,6 +607,7 @@ mod tests {
use crate::indexer::merge_policy::tests::MergeWheneverPossible;
use crate::schema::*;
use crate::Index;
use super::is_same_schema;
#[test]
fn test_delete_during_merge() {
@@ -694,4 +758,42 @@ mod tests {
.segment_entries();
assert!(seg_vec.is_empty());
}
#[test]
fn test_is_same_schema() {
//test empty list
assert!(is_same_schema(&vec![]).is_err());
//test same schemas
let mut list = vec![];
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("name", TEXT);
schema_builder.add_text_field("body", STORED | TEXT);
list.push(schema_builder.build());
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("name", TEXT);
schema_builder.add_text_field("body", STORED | TEXT);
let schema = schema_builder.build();
list.push(schema.clone());
let result = is_same_schema(&list);
assert!(result.is_ok());
assert_eq!(result.unwrap(), schema);
//test different schemas
let mut list = vec![];
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("name", TEXT);
schema_builder.add_text_field("body", STORED | TEXT);
list.push(schema_builder.build());
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("name", TEXT);
schema_builder.add_text_field("body", STORED | TEXT);
schema_builder.add_f64_field("weight", FAST);
list.push(schema_builder.build());
assert!(is_same_schema(&list).is_err());
}
}

View File

@@ -192,7 +192,7 @@ impl SchemaBuilder {
}))
}
}
#[derive(Debug)]
struct InnerSchema {
fields: Vec<FieldEntry>,
fields_map: HashMap<String, Field>, // transient
@@ -226,7 +226,7 @@ impl Eq for InnerSchema {}
/// let schema = schema_builder.build();
///
/// ```
#[derive(Clone, Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct Schema(Arc<InnerSchema>);
impl Schema {