mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-05 16:52:55 +00:00
Merge branch 'master' of github.com:tantivy-search/tantivy
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use super::operation::AddOperation;
|
||||
use super::operation::{AddOperation, UserOperation};
|
||||
use super::segment_updater::SegmentUpdater;
|
||||
use super::PreparedCommit;
|
||||
use bit_set::BitSet;
|
||||
@@ -26,6 +26,7 @@ use schema::Document;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::Term;
|
||||
use std::mem;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
@@ -43,8 +44,8 @@ pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
|
||||
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
|
||||
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
|
||||
|
||||
type DocumentSender = channel::Sender<AddOperation>;
|
||||
type DocumentReceiver = channel::Receiver<AddOperation>;
|
||||
type DocumentSender = channel::Sender<Vec<AddOperation>>;
|
||||
type DocumentReceiver = channel::Receiver<Vec<AddOperation>>;
|
||||
|
||||
/// Split the thread memory budget into
|
||||
/// - the heap size
|
||||
@@ -266,7 +267,7 @@ fn index_documents(
|
||||
memory_budget: usize,
|
||||
segment: &Segment,
|
||||
generation: usize,
|
||||
document_iterator: &mut impl Iterator<Item = AddOperation>,
|
||||
document_iterator: &mut Iterator<Item = Vec<AddOperation>>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
) -> Result<bool> {
|
||||
@@ -274,11 +275,11 @@ fn index_documents(
|
||||
let segment_id = segment.id();
|
||||
let table_size = initial_table_size(memory_budget);
|
||||
let mut segment_writer = SegmentWriter::for_segment(table_size, segment.clone(), &schema)?;
|
||||
for doc in document_iterator {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
|
||||
for documents in document_iterator {
|
||||
for doc in documents {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
}
|
||||
let mem_usage = segment_writer.mem_usage();
|
||||
|
||||
if mem_usage >= memory_budget - MARGIN_IN_BYTES {
|
||||
info!(
|
||||
"Buffer limit reached, flushing segment with maxdoc={}.",
|
||||
@@ -409,8 +410,12 @@ impl IndexWriter {
|
||||
// this is a valid guarantee as the
|
||||
// peeked document now belongs to
|
||||
// our local iterator.
|
||||
if let Some(operation) = document_iterator.peek() {
|
||||
delete_cursor.skip_to(operation.opstamp);
|
||||
if let Some(operations) = document_iterator.peek() {
|
||||
if let Some(first) = operations.first() {
|
||||
delete_cursor.skip_to(first.opstamp);
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
// No more documents.
|
||||
// Happens when there is a commit, or if the `IndexWriter`
|
||||
@@ -643,25 +648,168 @@ impl IndexWriter {
|
||||
pub fn add_document(&mut self, document: Document) -> u64 {
|
||||
let opstamp = self.stamper.stamp();
|
||||
let add_operation = AddOperation { opstamp, document };
|
||||
let send_result = self.document_sender.send(add_operation);
|
||||
let send_result = self.document_sender.send(vec![add_operation]);
|
||||
if let Err(e) = send_result {
|
||||
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
|
||||
}
|
||||
opstamp
|
||||
}
|
||||
|
||||
/// Gets a range of stamps from the stamper and "pops" the last stamp
|
||||
/// from the range returning a tuple of the last optstamp and the popped
|
||||
/// range.
|
||||
///
|
||||
/// The total number of stamps generated by this method is `count + 1`;
|
||||
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
|
||||
/// is for the batch itself.
|
||||
fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
|
||||
let Range { start, end } = self.stamper.stamps(count + 1u64);
|
||||
let last_opstamp = end - 1;
|
||||
let stamps = Range {
|
||||
start: start,
|
||||
end: last_opstamp,
|
||||
};
|
||||
(last_opstamp, stamps)
|
||||
}
|
||||
|
||||
/// Runs a group of document operations ensuring that the operations are
|
||||
/// assigned contigous u64 opstamps and that add operations of the same
|
||||
/// group are flushed into the same segment.
|
||||
///
|
||||
/// If the indexing pipeline is full, this call may block.
|
||||
///
|
||||
/// Each operation of the given `user_operations` will receive an in-order,
|
||||
/// contiguous u64 opstamp. The entire batch itself is also given an
|
||||
/// opstamp that is 1 greater than the last given operation. This
|
||||
/// `batch_opstamp` is the return value of `run`. An empty group of
|
||||
/// `user_operations`, an empty `Vec<UserOperation>`, still receives
|
||||
/// a valid opstamp even though no changes were _actually_ made to the index.
|
||||
///
|
||||
/// Like adds and deletes (see `IndexWriter.add_document` and
|
||||
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
|
||||
/// visible to readers only after calling `commit()`.
|
||||
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
|
||||
let count = user_operations.len() as u64;
|
||||
if count == 0 {
|
||||
return self.stamper.stamp();
|
||||
}
|
||||
let (batch_opstamp, stamps) = self.get_batch_opstamps(count);
|
||||
|
||||
let mut adds: Vec<AddOperation> = Vec::new();
|
||||
|
||||
for (user_op, opstamp) in user_operations.into_iter().zip(stamps) {
|
||||
match user_op {
|
||||
UserOperation::Delete(term) => {
|
||||
let delete_operation = DeleteOperation {
|
||||
opstamp: opstamp,
|
||||
term: term,
|
||||
};
|
||||
self.delete_queue.push(delete_operation);
|
||||
}
|
||||
UserOperation::Add(doc) => {
|
||||
let add_operation = AddOperation {
|
||||
opstamp: opstamp,
|
||||
document: doc,
|
||||
};
|
||||
adds.push(add_operation);
|
||||
}
|
||||
}
|
||||
}
|
||||
let send_result = self.document_sender.send(adds);
|
||||
if let Err(e) = send_result {
|
||||
panic!("Failed to index document. Sending to indexing channel failed. This probably means all of the indexing threads have panicked. {:?}", e);
|
||||
};
|
||||
|
||||
batch_opstamp
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::super::operation::UserOperation;
|
||||
use super::initial_table_size;
|
||||
use directory::error::LockError;
|
||||
use error::*;
|
||||
use indexer::NoMergePolicy;
|
||||
use schema::{self, Document};
|
||||
use schema::{self, Document, IndexRecordOption};
|
||||
use query::{TermQuery};
|
||||
use collector::TopDocs;
|
||||
use Index;
|
||||
use Term;
|
||||
|
||||
#[test]
|
||||
fn test_operations_group() {
|
||||
// an operations group with 2 items should cause 3 opstamps 0, 1, and 2.
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let operations = vec![
|
||||
UserOperation::Add(doc!(text_field=>"a")),
|
||||
UserOperation::Add(doc!(text_field=>"b")),
|
||||
];
|
||||
let batch_opstamp1 = index_writer.run(operations);
|
||||
assert_eq!(batch_opstamp1, 2u64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ordered_batched_operations() {
|
||||
// * one delete for `doc!(field=>"a")`
|
||||
// * one add for `doc!(field=>"a")`
|
||||
// * one add for `doc!(field=>"b")`
|
||||
// * one delete for `doc!(field=>"b")`
|
||||
// after commit there is one doc with "a" and 0 doc with "b"
|
||||
let mut schema_builder = schema::Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let a_term = Term::from_field_text(text_field, "a");
|
||||
let b_term = Term::from_field_text(text_field, "b");
|
||||
let operations = vec![
|
||||
UserOperation::Delete(a_term),
|
||||
UserOperation::Add(doc!(text_field=>"a")),
|
||||
UserOperation::Add(doc!(text_field=>"b")),
|
||||
UserOperation::Delete(b_term),
|
||||
];
|
||||
|
||||
index_writer.run(operations);
|
||||
index_writer.commit().expect("failed to commit");
|
||||
index.load_searchers().expect("failed to load searchers");
|
||||
|
||||
let a_term = Term::from_field_text(text_field, "a");
|
||||
let b_term = Term::from_field_text(text_field, "b");
|
||||
|
||||
let a_query = TermQuery::new(a_term, IndexRecordOption::Basic);
|
||||
let b_query = TermQuery::new(b_term, IndexRecordOption::Basic);
|
||||
|
||||
let searcher = index.searcher();
|
||||
|
||||
let a_docs = searcher
|
||||
.search(&a_query, &TopDocs::with_limit(1))
|
||||
.expect("search for a failed");
|
||||
|
||||
let b_docs = searcher
|
||||
.search(&b_query, &TopDocs::with_limit(1))
|
||||
.expect("search for b failed");
|
||||
|
||||
assert_eq!(a_docs.len(), 1);
|
||||
assert_eq!(b_docs.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_operations_group() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
let mut index_writer = index.writer(3_000_000).unwrap();
|
||||
let operations1 = vec![];
|
||||
let batch_opstamp1 = index_writer.run(operations1);
|
||||
assert_eq!(batch_opstamp1, 0u64);
|
||||
let operations2 = vec![];
|
||||
let batch_opstamp2 = index_writer.run(operations2);
|
||||
assert_eq!(batch_opstamp2, 1u64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lockfile_stops_duplicates() {
|
||||
let schema_builder = schema::Schema::builder();
|
||||
|
||||
@@ -14,3 +14,10 @@ pub struct AddOperation {
|
||||
pub opstamp: u64,
|
||||
pub document: Document,
|
||||
}
|
||||
|
||||
/// UserOperation is an enum type that encapsulates other operation types.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub enum UserOperation {
|
||||
Add(Document),
|
||||
Delete(Term),
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -60,6 +61,16 @@ impl Stamper {
|
||||
pub fn stamp(&self) -> u64 {
|
||||
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
|
||||
}
|
||||
|
||||
/// Given a desired count `n`, `stamps` returns an iterator that
|
||||
/// will supply `n` number of u64 stamps.
|
||||
pub fn stamps(&self, n: u64) -> Range<u64> {
|
||||
let start = self.0.fetch_add(n, Ordering::SeqCst);
|
||||
Range {
|
||||
start: start,
|
||||
end: start + n,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -78,5 +89,7 @@ mod test {
|
||||
|
||||
assert_eq!(stamper.stamp(), 10u64);
|
||||
assert_eq!(stamper_clone.stamp(), 11u64);
|
||||
assert_eq!(stamper.stamps(3u64), (12..15));
|
||||
assert_eq!(stamper.stamp(), 15u64);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user