This commit is contained in:
Paul Masurel
2020-03-08 10:20:42 +09:00
parent ec32e0546a
commit f06e116aae
9 changed files with 149 additions and 77 deletions

View File

@@ -107,6 +107,7 @@ impl Segment {
Ok(())
}
/// Returns our index's schema.
pub fn schema(&self) -> Schema {
self.schema.clone()

View File

@@ -63,15 +63,19 @@ impl SpillingWriter {
}
pub fn flush_and_finalize(self) -> io::Result<()> {
if let SpillingState::Buffer {
buffer,
write_factory,
..
} = self.state.expect("State cannot be none") {
let mut wrt = write_factory()?;
wrt.write_all(&buffer[..])?;
wrt.flush()?;
wrt.terminate()?;
match self.state.expect("State cannot be none") {
SpillingState::Buffer {
buffer,
write_factory,
..
} => {
let mut wrt = write_factory()?;
wrt.write_all(&buffer[..])?;
wrt.terminate()?;
}
SpillingState::Spilled(wrt) => {
wrt.terminate()?;
}
}
Ok(())
}

View File

@@ -1,4 +1,5 @@
use super::operation::{AddOperation, UserOperation};
use crate::indexer::segment_manager::SegmentRegisters;
use super::segment_updater::SegmentUpdater;
use super::PreparedCommit;
use crate::common::BitSet;
@@ -32,9 +33,10 @@ use smallvec::smallvec;
use smallvec::SmallVec;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::JoinHandle;
use crate::indexer::segment_register::SegmentRegister;
// Size of the margin for the heap. A segment is closed when the remaining memory
// in the heap goes below MARGIN_IN_BYTES.
@@ -71,6 +73,8 @@ pub struct IndexWriter {
index: Index,
segment_registers: Arc<RwLock<SegmentRegisters>>,
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -134,7 +138,6 @@ fn compute_deleted_bitset(
/// For instance, there was no delete operation between the state of the `segment_entry` and
/// the `target_opstamp`, `segment_entry` is not updated.
pub(crate) fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
) -> crate::Result<()> {
@@ -143,25 +146,33 @@ pub(crate) fn advance_deletes(
return Ok(());
}
if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() {
let delete_bitset_opt = segment_entry.take_delete_bitset();
// We avoid directly advancing the `SegmentEntry` delete cursor, because
// we do not want to end up in an invalid state if the delete bitset
// serialization fails.
let mut delete_cursor = segment_entry.delete_cursor();
if delete_bitset_opt.is_none() && segment_entry.delete_cursor().get().is_none() {
// There has been no `DeleteOperation` between the segment status and `target_opstamp`.
return Ok(());
}
// We open our current serialized segment to compute the new deleted bitset.
let segment = segment_entry.segment().clone();
let segment_reader = SegmentReader::open(&segment)?;
let max_doc = segment_reader.max_doc();
let mut delete_bitset: BitSet = match segment_entry.delete_bitset() {
Some(previous_delete_bitset) => (*previous_delete_bitset).clone(),
None => BitSet::with_max_value(max_doc),
};
let mut delete_bitset: BitSet =
delete_bitset_opt.unwrap_or_else(|| BitSet::with_max_value(max_doc));
let num_deleted_docs_before = segment.meta().num_deleted_docs();
compute_deleted_bitset(
&mut delete_bitset,
&segment_reader,
segment_entry.delete_cursor(),
&mut delete_cursor,
&DocToOpstampMapping::None,
target_opstamp,
)?;
@@ -180,13 +191,18 @@ pub(crate) fn advance_deletes(
let num_deleted_docs: u32 = delete_bitset.len() as u32;
if num_deleted_docs > num_deleted_docs_before {
// There are new deletes. We need to write a new delete file.
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
let mut delete_file = segment
.with_delete_meta(num_deleted_docs as u32, target_opstamp)
.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
delete_file.terminate()?;
segment_entry.reset_delete_meta(num_deleted_docs as u32, target_opstamp);
}
segment_entry.set_meta(segment.meta().clone());
// Regardless of whether we did end up having to write a new file or not
// we advance the `delete_cursor`. This is an optimisation. We want to ensure we do not
// check that a given deleted term does not match any of our docs more than once.
segment_entry.set_delete_cursor(delete_cursor);
Ok(())
}
@@ -240,7 +256,7 @@ fn index_documents(
)?;
let segment_entry = SegmentEntry::new(
segment_with_max_doc.meta().clone(),
segment_with_max_doc,
delete_cursor,
delete_bitset_opt,
);
@@ -317,10 +333,20 @@ impl IndexWriter {
let current_opstamp = index.load_metas()?.opstamp;
let meta = index.load_metas()?;
let stamper = Stamper::new(current_opstamp);
let commited_segments = SegmentRegister::new(
index.directory(),
&index.schema(),
meta.segments,
&delete_queue.cursor(),
);
let segment_registers = Arc::new(RwLock::new(SegmentRegisters::new(commited_segments)));
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
SegmentUpdater::create(segment_registers.clone(), index.clone(), stamper.clone())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
@@ -342,6 +368,7 @@ impl IndexWriter {
stamper,
worker_id: 0,
segment_registers
};
index_writer.start_workers()?;
Ok(index_writer)
@@ -381,13 +408,6 @@ impl IndexWriter {
result
}
#[doc(hidden)]
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
block_on(self.segment_updater.schedule_add_segment(segment_entry))
}
/// Creates a new segment.
///
/// This method is useful only for users trying to do complex

View File

@@ -1,4 +1,5 @@
use crate::common::MAX_DOC_LIMIT;
use crate::directory::TerminatingWrite;
use crate::core::Segment;
use crate::core::SegmentReader;
use crate::core::SerializableSegment;
@@ -674,7 +675,8 @@ impl IndexMerger {
store_writer.stack(&store_reader)?;
}
}
store_writer.close()?;
let store_wrt = store_writer.close()?;
store_wrt.terminate()?;
Ok(())
}
}

View File

@@ -3,6 +3,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::indexer::delete_queue::DeleteCursor;
use std::fmt;
use crate::{Segment, Opstamp};
/// A segment entry describes the state of
/// a given segment, at a given instant.
@@ -19,7 +20,7 @@ use std::fmt;
/// in the .del file or in the `delete_bitset`.
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
segment: Segment,
delete_bitset: Option<BitSet>,
delete_cursor: DeleteCursor,
}
@@ -27,12 +28,12 @@ pub struct SegmentEntry {
impl SegmentEntry {
/// Create a new `SegmentEntry`
pub fn new(
segment_meta: SegmentMeta,
segment: Segment,
delete_cursor: DeleteCursor,
delete_bitset: Option<BitSet>,
) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
segment,
delete_bitset,
delete_cursor,
}
@@ -45,29 +46,52 @@ impl SegmentEntry {
self.delete_bitset.as_ref()
}
/// Set the `SegmentMeta` for this segment.
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
self.meta = segment_meta;
pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) {
self.delete_cursor = delete_cursor;
}
/// `Takes` (as in Option::take) the delete bitset of a segment entry.
/// `DocId` in this bitset are flagged as deleted.
pub fn take_delete_bitset(&mut self) -> Option<BitSet> {
self.delete_bitset.take()
}
/// Reset the delete information in this segment.
///
/// The `SegmentEntry` segment's `SegmentMeta` gets updated, and
/// any delete bitset is drop and set to None.
pub fn reset_delete_meta(&mut self, num_deleted_docs: u32, target_opstamp: Opstamp) {
self.segment = self
.segment
.clone()
.with_delete_meta(num_deleted_docs, target_opstamp);
self.delete_bitset = None;
}
/// Return a reference to the segment_entry's delete cursor
pub fn delete_cursor(&mut self) -> &mut DeleteCursor {
&mut self.delete_cursor
pub fn delete_cursor(&mut self) -> DeleteCursor {
self.delete_cursor.clone()
}
/// Returns the segment id.
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
self.meta().id()
}
/// Returns the `segment` associated to the `SegmentEntry`.
pub fn segment(&self) -> &Segment {
&self.segment
}
/// Accessor to the `SegmentMeta`
pub fn meta(&self) -> &SegmentMeta {
&self.meta
self.segment.meta()
}
}
impl fmt::Debug for SegmentEntry {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(formatter, "SegmentEntry({:?})", self.meta)
write!(formatter, "SegmentEntry({:?})", self.meta())
}
}

View File

@@ -2,15 +2,15 @@ use super::segment_register::SegmentRegister;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::error::TantivyError;
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::SegmentEntry;
use std::collections::hash_set::HashSet;
use std::fmt::{self, Debug, Formatter};
use std::sync::RwLock;
use std::sync::{RwLock, Arc};
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use crate::Segment;
#[derive(Default)]
struct SegmentRegisters {
pub(crate) struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
}
@@ -22,6 +22,18 @@ pub(crate) enum SegmentsStatus {
}
impl SegmentRegisters {
pub fn new(committed: SegmentRegister) -> SegmentRegisters {
SegmentRegisters {
uncommitted: Default::default(),
committed,
}
}
pub fn committed_segment(&self) -> Vec<Segment> {
self.committed.segments()
}
/// Check if all the segments are committed or uncommited.
///
/// If some segment is missing or segments are in a different state (this should not happen
@@ -44,7 +56,7 @@ impl SegmentRegisters {
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
registers: Arc<RwLock<SegmentRegisters>>,
}
impl Debug for SegmentManager {
@@ -74,15 +86,11 @@ pub fn get_mergeable_segments(
}
impl SegmentManager {
pub fn from_segments(
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
) -> SegmentManager {
pub(crate) fn new(registers: Arc<RwLock<SegmentRegisters>>) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
}),
registers
}
}

View File

@@ -5,6 +5,9 @@ use crate::indexer::segment_entry::SegmentEntry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::{self, Debug, Formatter};
use crate::Segment;
use crate::directory::ManagedDirectory;
use crate::schema::Schema;
/// The segment register keeps track
/// of the list of segment, their size as well
@@ -45,6 +48,13 @@ impl SegmentRegister {
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
pub fn segments(&self) -> Vec<Segment> {
self.segment_states
.values()
.map(|segment_entry| segment_entry.segment().clone())
.collect()
}
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
self.segment_states.values().cloned().collect()
@@ -79,11 +89,17 @@ impl SegmentRegister {
self.segment_states.get(segment_id).cloned()
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
pub fn new(
directory: &ManagedDirectory,
schema: &Schema,
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
) -> SegmentRegister {
let mut segment_states = HashMap::new();
for segment_meta in segment_metas {
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
let segment = Segment::new_persisted(segment_meta, directory.clone(), schema.clone());
let segment_entry = SegmentEntry::new(segment, delete_cursor.clone(), None);
segment_states.insert(segment_id, segment_entry);
}
SegmentRegister { segment_states }
@@ -108,6 +124,7 @@ mod tests {
fn test_segment_register() {
let inventory = SegmentMetaInventory::default();
let delete_queue = DeleteQueue::new();
let schema = Schema::builder().build();
let mut segment_register = SegmentRegister::default();
let segment_id_a = SegmentId::generate_random();
@@ -115,21 +132,24 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
let meta = inventory.new_segment_meta(segment_id_a, 0u32);
let segment = Segment::new_volatile(meta, schema.clone());
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
let meta = inventory.new_segment_meta(segment_id_b, 0u32);
let segment = Segment::new_volatile(meta, schema.clone());
let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
let segment_merged = Segment::new_volatile(segment_meta_merged, schema.clone());
let segment_entry = SegmentEntry::new(segment_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);

View File

@@ -7,11 +7,10 @@ use crate::core::SegmentMeta;
use crate::core::SerializableSegment;
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_manager::SegmentsStatus;
use crate::indexer::segment_manager::{SegmentsStatus, SegmentRegisters};
use crate::indexer::stamper::Stamper;
use crate::indexer::SegmentEntry;
use crate::indexer::SegmentSerializer;
@@ -117,8 +116,7 @@ fn merge(
// First we apply all of the delet to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
advance_deletes( segment_entry, target_opstamp)?;
}
let delete_cursor = segment_entries[0].delete_cursor().clone();
@@ -137,10 +135,9 @@ fn merge(
let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?;
merger.write_storable_fields(store_wrt)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
let max_doc = merger.write(segment_serializer)?;
Ok(SegmentEntry::new(segment_meta, delete_cursor, None))
Ok(SegmentEntry::new(merged_segment.with_max_doc(max_doc), delete_cursor, None))
}
pub(crate) struct InnerSegmentUpdater {
@@ -164,12 +161,11 @@ pub(crate) struct InnerSegmentUpdater {
impl SegmentUpdater {
pub fn create(
segment_registers: Arc<RwLock<SegmentRegisters>>,
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
stamper: Stamper
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let segment_manager = SegmentManager::new(segment_registers);
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
@@ -264,8 +260,7 @@ impl SegmentUpdater {
fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
let mut segment_entries = self.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
advance_deletes(segment_entry, target_opstamp)?;
}
Ok(segment_entries)
}
@@ -480,10 +475,7 @@ impl SegmentUpdater {
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) = advance_deletes(
segment,
&mut after_merge_segment_entry,
committed_opstamp,
) {

View File

@@ -100,6 +100,8 @@ impl<W: io::Write> StoreWriter<W> {
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
///
/// The returned writer is not flushed.
pub fn close(mut self) -> io::Result<W> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
@@ -108,7 +110,6 @@ impl<W: io::Write> StoreWriter<W> {
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.flush()?;
let (wrt, _) = self.writer.finish()?;
Ok(wrt)