Some small refactoring

This commit is contained in:
Paul Masurel
2020-01-02 22:34:37 +09:00
parent b449749d63
commit 4448854f73
5 changed files with 68 additions and 40 deletions

View File

@@ -140,7 +140,6 @@ fn compute_deleted_bitset(
/// For instance, there was no delete operation between the state of the `segment_entry` and
/// the `target_opstamp`, `segment_entry` is not updated.
pub(crate) fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
) -> crate::Result<()> {
@@ -149,28 +148,38 @@ pub(crate) fn advance_deletes(
return Ok(());
}
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
let delete_bitset_opt = segment_entry.take_delete_bitset();
// We avoid directly advancing the `SegmentEntry` delete cursor, because
// we do not want to end up in an invalid state if the delete bitset
// serialization fails.
let mut delete_cursor = segment_entry.delete_cursor();
if delete_bitset_opt.is_none() && delete_cursor.get().is_none() {
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
return Ok(());
}
// We open our current serialized segment to compute the new deleted bitset.
let segment = segment_entry.segment().clone();
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
None => BitSet::with_max_value(max_doc),
};
let mut delete_bitset: BitSet =
delete_bitset_opt.unwrap_or_else(|| BitSet::with_max_value(max_doc));
let num_deleted_docs_before = segment.meta().num_deleted_docs();
compute_deleted_bitset(
&mut delete_bitset,
&segment_reader,
segment_entry.delete_cursor(),
&mut delete_cursor,
&DocToOpstampMapping::None,
target_opstamp,
)?;
// TODO optimize
// TODO optimize... We are simply manipulating bitsets here.
// We should be able to compute the union much faster.
if let Some(seg_delete_bitset) = segment_reader.delete_bitset() {
for doc in 0u32..max_doc {
if seg_delete_bitset.is_deleted(doc) {
@@ -179,15 +188,23 @@ pub(crate) fn advance_deletes(
}
}
let num_deleted_docs = delete_bitset.len();
if num_deleted_docs > 0 {
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
let num_deleted_docs = delete_bitset.len() as u32;
if num_deleted_docs > num_deleted_docs_before {
// We need to write a new delete file.
let mut delete_file = segment
.with_delete_meta(num_deleted_docs, target_opstamp)
.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
delete_file.terminate()?;
segment_entry.reset_delete_meta(num_deleted_docs as u32, target_opstamp);
}
segment_entry.set_segment(segment);
// Regardless of whether we did end up having to write a new file or not
// we advance the `delete_cursor`. This is an optimisation. We want to ensure we do not
// check that a given deleted term does not match any of our docs more than once.
segment_entry.set_delete_cursor(delete_cursor);
Ok(())
}

View File

@@ -32,12 +32,12 @@ impl<'a> PreparedCommit<'a> {
pub fn commit(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp);
let _ = block_on(
block_on(
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload),
);
let _ = block_on(self.index_writer.trigger_commit());
)?;
block_on(self.index_writer.trigger_commit());
Ok(self.opstamp)
}
}

View File

@@ -3,7 +3,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::directory::ManagedDirectory;
use crate::indexer::delete_queue::DeleteCursor;
use crate::Segment;
use crate::{Opstamp, Segment};
use std::fmt;
/// A segment entry describes the state of
@@ -50,21 +50,32 @@ impl SegmentEntry {
&self.segment
}
/// Return a reference to the segment entry deleted bitset.
/// `Takes` (as in Option::take) the delete bitset of
/// a segment entry.
///
/// `DocId` in this bitset are flagged as deleted.
pub fn delete_bitset(&self) -> Option<&BitSet> {
self.delete_bitset.as_ref()
pub fn take_delete_bitset(&mut self) -> Option<BitSet> {
self.delete_bitset.take()
}
/// Set the `SegmentMeta` for this segment.
pub fn set_segment(&mut self, segment: Segment) {
self.segment = segment;
/// Reset the delete informmationo in this segment.
///
/// The `SegmentEntry` segment's `SegmentMeta` gets updated, and
/// any delete bitset is drop and set to None.
pub fn reset_delete_meta(&mut self, num_deleted_docs: u32, target_opstamp: Opstamp) {
self.segment = self
.segment
.clone()
.with_delete_meta(num_deleted_docs, target_opstamp);
self.delete_bitset = None;
}
pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) {
self.delete_cursor = delete_cursor;
}
/// Return a reference to the segment_entry's delete cursor
pub fn delete_cursor(&mut self) -> &mut DeleteCursor {
&mut self.delete_cursor
pub fn delete_cursor(&mut self) -> DeleteCursor {
self.delete_cursor.clone()
}
/// Returns the segment id.

View File

@@ -116,8 +116,7 @@ fn merge(
// First we apply all of the delet to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
advance_deletes(segment_entry, target_opstamp)?;
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
@@ -232,12 +231,11 @@ impl SegmentUpdater {
pub fn schedule_add_segment(
&self,
mut segment_entry: SegmentEntry,
segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<()>> {
// TODO temporary: serializing the segment at this point.
let segment_updater = self.clone();
self.schedule_future(async move {
segment_entry.persist(segment_updater.index.directory().clone())?;
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
@@ -264,8 +262,7 @@ impl SegmentUpdater {
fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
let mut segment_entries = self.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
advance_deletes(segment_entry, target_opstamp)?;
}
Ok(segment_entries)
}
@@ -335,8 +332,13 @@ impl SegmentUpdater {
payload: Option<String>,
) -> impl Future<Output = crate::Result<()>> {
let segment_updater: SegmentUpdater = self.clone();
let directory = self.index.directory().clone();
self.schedule_future(async move {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
let mut segment_entries = segment_updater.purge_deletes(opstamp)?;
for segment_entry in &mut segment_entries {
let directory = directory.clone();
segment_entry.persist(directory)?;
}
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
@@ -476,17 +478,14 @@ impl SegmentUpdater {
let end_merge_future = self.schedule_future(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
let mut delete_cursor = after_merge_segment_entry.delete_cursor();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) = advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
) {
let _index = &segment_updater.index;
if let Err(e) =
advance_deletes(&mut after_merge_segment_entry, committed_opstamp)
{
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(),

View File

@@ -112,6 +112,7 @@ mod tests {
let term_a = Term::from_field_text(text_field, "a");
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
let reader = index.reader().unwrap();
assert_eq!(reader.searcher().segment_readers().len(), 1);
assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1);
}