test passing.

SegmentWriter create SegmentEntry which contain a delete_bitset
This commit is contained in:
Paul Masurel
2017-03-26 17:33:42 +09:00
parent 30075176cb
commit ddb2b8d807
3 changed files with 38 additions and 31 deletions

View File

@@ -160,8 +160,9 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: DocToOpstampMapping,
target_opstamp: u64) -> Result<(Option<BitSet>)> {
target_opstamp: u64) -> Result<bool> {
let mut might_have_changed = false;
loop {
if let Some(delete_op) = delete_cursor.peek() {
@@ -180,6 +181,7 @@ pub fn compute_deleted_bitset(
let deleted_doc = docset.doc();
if deleted_doc < limit_doc {
delete_bitset.insert(deleted_doc as usize);
might_have_changed = true;
}
}
}
@@ -190,13 +192,7 @@ pub fn compute_deleted_bitset(
}
delete_cursor.advance();
}
if !delete_bitset.is_empty() {
Ok(Some(delete_bitset))
}
else {
Ok(None)
}
Ok(might_have_changed)
}
@@ -213,44 +209,45 @@ pub fn advance_deletes(
segment_entry: &mut SegmentEntry,
target_opstamp: u64) -> Result<()> {
{
{
if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() {
// We are already up-to-date here.
if target_opstamp == previous_opstamp {
return Ok(());
}
}
let segment_reader = SegmentReader::open(segment.clone())?;
let max_doc = segment_reader.max_doc();
let mut delete_bitset: BitSet =
match segment_entry.reset_delete_bitset() {
Some(previous_delete_bitset) => {
previous_delete_bitset
},
None => {
BitSet::with_capacity(segment_reader.max_doc() as usize)
BitSet::with_capacity(max_doc as usize)
}
};
let delete_cursor = segment_entry.delete_cursor();
let new_deleted_bitset = compute_deleted_bitset(
compute_deleted_bitset(
&mut delete_bitset,
&segment_reader,
delete_cursor,
&mut delete_bitset,
DocToOpstampMapping::None,
target_opstamp)?;
// we only write the result different
// iff we ended ended up increasing the delete opstamp
//
// TODO just move the file if there was no new delete?
if let Some(mut delete_bitset) = new_deleted_bitset {
for doc in 0u32..segment_reader.max_doc() {
if segment_reader.is_deleted(doc) {
delete_bitset.insert(doc as usize);
}
for doc in 0u32..max_doc {
if segment_reader.is_deleted(doc) {
delete_bitset.insert(doc as usize);
}
let num_deleted_docs = delete_bitset.len();
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
let num_deleted_docs = delete_bitset.len();
segment.set_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
segment_entry.set_meta(segment.meta().clone());
@@ -291,7 +288,9 @@ fn index_documents(heap: &mut Heap,
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let segment_reader = SegmentReader::open(segment)?;
let delete_bitset = compute_deleted_bitset(
let mut deleted_bitset = BitSet::with_capacity(num_docs as usize);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
&mut delete_cursor,
doc_to_opstamps,
@@ -301,7 +300,9 @@ fn index_documents(heap: &mut Heap,
let segment_entry = SegmentEntry::new(
segment_meta,
delete_cursor,
delete_bitset);
{ if may_have_deletes { Some(deleted_bitset) }
else { None } }
);
segment_updater
.add_segment(generation, segment_entry)

View File

@@ -307,7 +307,7 @@ mod tests {
use futures::Future;
#[test]
fn test_index_merger() {
fn test_index_merger_no_deletes() {
let mut schema_builder = schema::SchemaBuilder::default();
let text_fieldtype = schema::TextOptions::default()
.set_indexing_options(TextIndexingOptions::TokenizedWithFreq)

View File

@@ -300,7 +300,13 @@ impl SegmentUpdater {
.end_merge(segment_ids_vec, after_merge_segment_entry)
.wait()
.expect("Segment updater thread is corrupted.");
merging_future_send.complete(merged_segment_meta);
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(_) => {
// ... cancel merge