Minor syntax changes, and passing a tantivy Directory as argument

Closes #1005.
This commit is contained in:
Paul Masurel
2021-04-07 23:35:03 +09:00
parent a550c85369
commit e67e5ebd46

View File

@@ -141,37 +141,54 @@ fn merge(
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
}
/// Merges a list of segments from different indices.
/// Advanced: Merges a list of segments from different indices in a new index.
///
/// Returns `TantivyError` if the the indices list is empty or their
/// schemas don't match.
pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Result<Index> {
// validate schema
let schemas: Vec<Schema> = indices.iter().map(|index| index.schema()).collect();
let is_not_valid = match schemas.first() {
Some(first) => schemas.iter().skip(1).any(|next| next != first),
None => true,
};
if is_not_valid {
///
/// `output_directory`: is assumed to be empty.
///
/// # Warning
/// This function does NOT check or take the `IndexWriter` is running. It is not
/// meant to work if you have an IndexWriter running for the origin indices, or
/// the destination Index.
#[doc(hidden)]
pub fn merge_segments<Dir: Directory>(
indices: &[Index],
output_directory: Dir,
) -> crate::Result<Index> {
if indices.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
"Attempt to merge empty or different schema indices".to_string(),
"No indices given to marge".to_string(),
));
}
let mut merged_index = Index::create_in_dir(output_directory, schemas[0].clone())?;
let merged_segment = merged_index.new_segment();
let target_schema = indices[0].schema();
let mut segments: Vec<Segment> = vec![];
// let's check that all of the indices have the same schema
if indices
.iter()
.skip(1)
.any(|index| &index.schema() != &target_schema)
{
return Err(crate::TantivyError::InvalidArgument(
"Attempt to merge different schema indices".to_string(),
));
}
let mut segments: Vec<Segment> = Vec::new();
for index in indices {
segments.extend(index.searchable_segments()?);
}
let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
let num_docs = merger.write(segment_serializer)?;
let mut merged_index = Index::create(output_directory, target_schema.clone())?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
let stats = format!(
@@ -188,7 +205,7 @@ pub fn merge_segments(indices: &[Index], output_directory: PathBuf) -> crate::Re
let index_meta = IndexMeta {
segments: vec![segment_meta],
schema: merged_index.schema(),
schema: target_schema,
opstamp: 0u64,
payload: Some(stats),
};
@@ -537,7 +554,7 @@ impl SegmentUpdater {
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) = advance_deletes(
if let Err(advance_deletes_err) = advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
@@ -545,7 +562,7 @@ impl SegmentUpdater {
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(),
e
advance_deletes_err
);
if cfg!(test) {
panic!("Merge failed.");
@@ -553,7 +570,7 @@ impl SegmentUpdater {
// ... cancel merge
// `merge_operations` are tracked. As it is dropped, the
// the segment_ids will be available again for merge.
return Err(e);
return Err(advance_deletes_err);
}
}
}
@@ -599,163 +616,130 @@ impl SegmentUpdater {
#[cfg(test)]
mod tests {
use super::merge_segments;
use crate::directory::RAMDirectory;
use crate::indexer::merge_policy::tests::MergeWheneverPossible;
use crate::schema::*;
use crate::Index;
use std::path::PathBuf;
#[test]
fn test_delete_during_merge() {
fn test_delete_during_merge() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_for_tests().unwrap();
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(MergeWheneverPossible));
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
assert!(index_writer.commit().is_ok());
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
index_writer.commit()?;
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"c"));
index_writer.add_document(doc!(text_field=>"d"));
}
assert!(index_writer.commit().is_ok());
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"c"));
index_writer.add_document(doc!(text_field=>"d"));
}
index_writer.commit()?;
{
index_writer.add_document(doc!(text_field=>"e"));
index_writer.add_document(doc!(text_field=>"f"));
assert!(index_writer.commit().is_ok());
}
index_writer.add_document(doc!(text_field=>"e"));
index_writer.add_document(doc!(text_field=>"f"));
index_writer.commit()?;
{
let term = Term::from_field_text(text_field, "a");
index_writer.delete_term(term);
assert!(index_writer.commit().is_ok());
}
let reader = index.reader().unwrap();
let term = Term::from_field_text(text_field, "a");
index_writer.delete_term(term);
index_writer.commit()?;
let reader = index.reader()?;
assert_eq!(reader.searcher().num_docs(), 302);
{
index_writer
.wait_merging_threads()
.expect("waiting for merging threads");
}
index_writer.wait_merging_threads()?;
reader.reload().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
assert_eq!(reader.searcher().num_docs(), 302);
Ok(())
}
#[test]
fn delete_all_docs() {
fn delete_all_docs() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_for_tests().unwrap();
let mut index_writer = index.writer_for_tests()?;
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
assert!(index_writer.commit().is_ok());
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
index_writer.commit()?;
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"c"));
index_writer.add_document(doc!(text_field=>"d"));
}
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"e"));
index_writer.add_document(doc!(text_field=>"f"));
index_writer.commit()?;
let seg_ids = index.searchable_segment_ids()?;
// docs exist, should have at least 1 segment
assert!(seg_ids.len() > 0);
let term_vals = vec!["a", "b", "c", "d", "e", "f"];
for term_val in term_vals {
let term = Term::from_field_text(text_field, term_val);
index_writer.delete_term(term);
index_writer.commit()?;
}
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"c"));
index_writer.add_document(doc!(text_field=>"d"));
}
assert!(index_writer.commit().is_ok());
}
index_writer.wait_merging_threads()?;
{
index_writer.add_document(doc!(text_field=>"e"));
index_writer.add_document(doc!(text_field=>"f"));
assert!(index_writer.commit().is_ok());
}
{
let seg_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
// docs exist, should have at least 1 segment
assert!(seg_ids.len() > 0);
}
{
let term_vals = vec!["a", "b", "c", "d", "e", "f"];
for term_val in term_vals {
let term = Term::from_field_text(text_field, term_val);
index_writer.delete_term(term);
assert!(index_writer.commit().is_ok());
}
}
{
index_writer
.wait_merging_threads()
.expect("waiting for merging threads");
}
let reader = index.reader().unwrap();
let reader = index.reader()?;
assert_eq!(reader.searcher().num_docs(), 0);
let seg_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let seg_ids = index.searchable_segment_ids()?;
assert!(seg_ids.is_empty());
reader.reload().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().num_docs(), 0);
// empty segments should be erased
assert!(index.searchable_segment_metas().unwrap().is_empty());
assert!(index.searchable_segment_metas()?.is_empty());
assert!(reader.searcher().segment_readers().is_empty());
Ok(())
}
#[test]
fn test_remove_all_segments() {
fn test_remove_all_segments() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_for_tests().unwrap();
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
assert!(index_writer.commit().is_ok());
let mut index_writer = index.writer_for_tests()?;
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"a"));
index_writer.add_document(doc!(text_field=>"b"));
}
index_writer.commit()?;
index_writer.segment_updater().remove_all_segments();
let seg_vec = index_writer
.segment_updater()
.segment_manager
.segment_entries();
assert!(seg_vec.is_empty());
Ok(())
}
#[test]
fn test_merge_segements() {
fn test_merge_segments() -> crate::Result<()> {
let mut indices = vec![];
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
@@ -764,67 +748,68 @@ mod tests {
for _ in 0..3 {
let index = Index::create_in_ram(schema.clone());
// writing the segment
let mut index_writer = index.writer_for_tests().unwrap();
{
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"fizz"));
index_writer.add_document(doc!(text_field=>"buzz"));
}
assert!(index_writer.commit().is_ok());
for _ in 0..1000 {
index_writer.add_document(doc!(text_field=>"foo"));
index_writer.add_document(doc!(text_field=>"bar"));
}
assert!(index_writer.commit().is_ok());
}
indices.push(index);
}
assert_eq!(indices.len(), 3);
let out_dir = tempfile::tempdir().unwrap();
let index = merge_segments(&indices, PathBuf::from(out_dir.path())).unwrap();
assert_eq!(index.schema(), schema);
let segments = index.searchable_segments().unwrap();
assert_eq!(segments.len(), 1);
let segement_metas = segments[0].meta();
assert_eq!(segement_metas.num_deleted_docs(), 0);
assert_eq!(segement_metas.num_docs(), 6600);
}
#[test]
fn test_merge_mismatched_schema() {
let mut schemas = vec![];
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
schemas.push(schema_builder.build());
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("body", TEXT);
schemas.push(schema_builder.build());
let mut indices = vec![];
for schema in schemas {
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_for_tests().unwrap();
// writing two segments
let mut index_writer = index.writer_for_tests()?;
for _ in 0..100 {
index_writer.add_document(doc!(text_field=>"fizz"));
index_writer.add_document(doc!(text_field=>"buzz"));
}
assert!(index_writer.commit().is_ok());
index_writer.commit()?;
for _ in 0..1000 {
index_writer.add_document(doc!(text_field=>"foo"));
index_writer.add_document(doc!(text_field=>"bar"));
}
index_writer.commit()?;
indices.push(index);
}
assert_eq!(indices.len(), 3);
let output_directory = RAMDirectory::default();
let index = merge_segments(&indices, output_directory)?;
assert_eq!(index.schema(), schema);
let segments = index.searchable_segments()?;
assert_eq!(segments.len(), 1);
let segment_metas = segments[0].meta();
assert_eq!(segment_metas.num_deleted_docs(), 0);
assert_eq!(segment_metas.num_docs(), 6600);
Ok(())
}
#[test]
fn test_merge_empty_indices_array() {
let merge_result = merge_segments(&[], RAMDirectory::default());
assert!(merge_result.is_err());
}
#[test]
fn test_merge_mismatched_schema() -> crate::Result<()> {
let first_index = {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"some text"));
index_writer.commit()?;
index
};
let second_index = {
let mut schema_builder = Schema::builder();
let body_field = schema_builder.add_text_field("body", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(body_field=>"some body"));
index_writer.commit()?;
index
};
// mismatched schema index list
let out_dir = tempfile::tempdir().unwrap();
let result = merge_segments(&indices, PathBuf::from(out_dir.path()));
let result = merge_segments(&[first_index, second_index], RAMDirectory::default());
assert!(result.is_err());
// empty index list
let result = merge_segments(&vec![], PathBuf::from(out_dir.path()));
assert!(result.is_err());
Ok(())
}
}