Bugfix related with deletes, rollback and the index opstamp.

This commit is contained in:
Paul Masurel
2017-02-27 01:42:25 +09:00
parent 8bcfdb8e80
commit 7a07144c68
9 changed files with 201 additions and 65 deletions

View File

@@ -39,7 +39,6 @@ pub struct Index {
directory: Box<Directory>,
schema: Schema,
searcher_pool: Arc<Pool<Searcher>>,
opstamp: u64,
}
@@ -117,12 +116,10 @@ impl Index {
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: Box<Directory>, metas: IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let opstamp = metas.opstamp;
let index = Index {
directory: directory,
schema: schema,
searcher_pool: Arc::new(Pool::new()),
opstamp: opstamp,
};
try!(index.load_searchers());
Ok(index)
@@ -146,7 +143,7 @@ impl Index {
/// The opstamp is the number of documents that have been added
/// from the beginning of time, and until the moment of the last commit.
pub fn opstamp(&self) -> u64 {
self.opstamp
load_metas(self.directory()).unwrap().opstamp
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -294,7 +291,6 @@ impl Clone for Index {
directory: self.directory.box_clone(),
schema: self.schema.clone(),
searcher_pool: self.searcher_pool.clone(),
opstamp: self.opstamp,
}
}
}

View File

@@ -79,7 +79,7 @@ impl Decodable for SegmentId {
impl fmt::Debug for SegmentId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SegmentId({:?})", self.uuid_string())
write!(f, "Seg({:?})", self.short_uuid_string())
}
}

61
src/functional_test.rs Normal file
View File

@@ -0,0 +1,61 @@
use std::collections::HashSet;
use rand::{thread_rng, Rng};
use schema::*;
use Index;
use Searcher;
use rand::distributions::{IndependentSample, Range};
fn check_index_content(searcher: &Searcher, vals: &HashSet<u32>) {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
}
#[test]
fn test_indexing() {
let mut schema_builder = SchemaBuilder::default();
let id_field = schema_builder.add_u32_field("id", U32_INDEXED);
let multiples_field = schema_builder.add_u32_field("multiples", U32_INDEXED);
let schema = schema_builder.build();
let index = Index::create_from_tempdir(schema).unwrap();
let universe = Range::new(0u32, 20u32);
let mut rng = thread_rng();
let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap();
let mut committed_docs: HashSet<u32> = HashSet::new();
let mut uncommitted_docs: HashSet<u32> = HashSet::new();
for n in 0..200 {
let random_val = universe.ind_sample(&mut rng);
if random_val == 0 {
index_writer.commit();
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
index.load_searchers().unwrap();
let searcher = index.searcher();
// check that everything is correct.
check_index_content(&searcher, &committed_docs);
}
else {
if committed_docs.remove(&random_val) ||
uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u32(id_field, random_val);
index_writer.delete_term(doc_id_term);
}
else {
uncommitted_docs.insert(random_val);
let mut doc = Document::new();
doc.add_u32(id_field, random_val);
for i in 1u32..10u32 {
doc.add_u32(multiples_field, random_val * i);
}
index_writer.add_document(doc);
}
}
}
}

View File

@@ -160,12 +160,25 @@ pub fn advance_deletes(
segment: &mut Segment,
delete_operations: &DeleteQueueSnapshot,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentMeta> {
let segment_reader = SegmentReader::open(segment.clone())?;
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let mut last_opstamp_opt: Option<u64> = None;
let previous_delete_opstamp_opt = segment.meta().delete_opstamp();
for delete_op in delete_operations.iter() {
// let's skip operations that have already been deleted.0u32
if let Some(previous_delete_opstamp) = previous_delete_opstamp_opt {
if delete_op.opstamp <= previous_delete_opstamp {
continue;
}
}
// A delete operation should only affect
// document that were inserted after it.
//
@@ -179,11 +192,11 @@ pub fn advance_deletes(
delete_bitset.insert(deleted_doc as usize);
}
}
last_opstamp_opt = Some(delete_op.opstamp);
}
last_opstamp_opt = Some(delete_op.opstamp);
}
if let Some(last_opstamp) = last_opstamp_opt {
if let Some(last_opstamp) = last_opstamp_opt {
for doc in 0u32..segment_reader.max_doc() {
if segment_reader.is_deleted(doc) {
delete_bitset.insert(doc as usize);
@@ -194,6 +207,7 @@ pub fn advance_deletes(
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
Ok(segment.meta().clone())
}
@@ -365,6 +379,8 @@ impl IndexWriter {
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<u64> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
// by updating the generation in the segment updater,
// pending add segment commands will be dismissed.
self.generation += 1;
@@ -428,6 +444,19 @@ impl IndexWriter {
///
pub fn commit(&mut self) -> Result<u64> {
// here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
//
// No document belonging to the next generation have been
// pushed too, because add_document can only happen
// on this thread.
// This will move uncommitted segments to the state of
// committed segments.
self.committed_opstamp = self.stamp();
info!("committing {}", self.committed_opstamp);
// this will drop the current document channel
// and recreate a new one channels.
self.recreate_document_channel();
@@ -444,17 +473,7 @@ impl IndexWriter {
try!(self.add_indexing_worker());
}
// here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
//
// No document belonging to the next generation have been
// pushed too, because add_document can only happen
// on this thread.
// This will move uncommitted segments to the state of
// committed segments.
self.committed_opstamp = self.stamp();
// wait for the segment update thread to have processed the info
self.segment_updater
@@ -473,13 +492,14 @@ impl IndexWriter {
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
pub fn delete_term(&mut self, term: Term) {
pub fn delete_term(&mut self, term: Term) -> u64 {
let opstamp = self.stamp();
let delete_operation = DeleteOperation {
opstamp: opstamp,
term: term,
};
self.delete_queue.push(delete_operation);
opstamp
}
fn stamp(&mut self) -> u64 {
@@ -498,6 +518,8 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
// TODO remove return without Result<>
pub fn add_document(&mut self, document: Document) -> io::Result<u64> {
let opstamp = self.stamp();
let add_operation = AddOperation {

View File

@@ -43,6 +43,10 @@ impl SegmentEntry {
self.state
}
pub fn set_state(&mut self, state: SegmentState) {
self.state = state;
}
pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) {
self.doc_to_opstamp = doc_to_opstamp;
}

View File

@@ -2,23 +2,17 @@ use super::segment_register::SegmentRegister;
use std::sync::RwLock;
use core::SegmentMeta;
use core::SegmentId;
use indexer::SegmentEntry;
use indexer::{SegmentEntry, SegmentState};
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
#[derive(Default)]
struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
}
impl Default for SegmentRegisters {
fn default() -> SegmentRegisters {
SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::default()
}
}
}
/// The segment manager stores the list of segments
@@ -26,6 +20,7 @@ impl Default for SegmentRegisters {
///
/// It guarantees the atomicity of the
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
}
@@ -71,7 +66,12 @@ impl SegmentManager {
);
segment_entries
}
pub fn segment_state(&self, segment_id: &SegmentId) -> Option<SegmentState> {
self.segment_entry(segment_id)
.map(|segment_entry| segment_entry.state())
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
let registers = self.read();
registers
@@ -100,11 +100,22 @@ impl SegmentManager {
segment_ids
}
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
pub fn commit(&self, segment_metas: Vec<SegmentMeta>) {
let committed_segment_entries = segment_metas
.into_iter()
.map(|segment_meta| {
let segment_id = segment_meta.id();
let mut segment_entry = SegmentEntry::new(segment_meta);
if let Some(state) = self.segment_state(&segment_id) {
segment_entry.set_state(state);
}
segment_entry
})
.collect::<Vec<_>>();
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();
for segment_entry in segment_entries {
for segment_entry in committed_segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
}
@@ -121,6 +132,9 @@ impl SegmentManager {
registers_lock.committed.start_merge(segment_id);
}
}
else {
error!("Merge operation sent for segments that are not all uncommited or commited.");
}
}
pub fn add_segment(&self, segment_entry: SegmentEntry) {
@@ -152,15 +166,3 @@ impl SegmentManager {
registers_lock.committed.segment_metas()
}
}
impl Default for SegmentManager {
fn default() -> SegmentManager {
SegmentManager {
registers: RwLock::new( SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::default(),
}),
}
}
}

View File

@@ -15,6 +15,7 @@ use indexer::segment_entry::SegmentEntry;
/// segments that are currently searchable,
/// and by the index merger to identify
/// merge candidates.
#[derive(Default)]
pub struct SegmentRegister {
segment_states: HashMap<SegmentId, SegmentEntry>,
}
@@ -110,13 +111,6 @@ impl SegmentRegister {
}
}
impl Default for SegmentRegister {
fn default() -> SegmentRegister {
SegmentRegister {
segment_states: HashMap::new(),
}
}
}
#[cfg(test)]
mod tests {

View File

@@ -105,8 +105,7 @@ struct InnerSegmentUpdater {
impl SegmentUpdater {
pub fn new(index: Index, delete_queue: DeleteQueue) -> Result<SegmentUpdater>
{
pub fn new(index: Index, delete_queue: DeleteQueue) -> Result<SegmentUpdater> {
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments);
Ok(
@@ -177,11 +176,7 @@ impl SegmentUpdater {
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
let segment_entries = segment_metas
.into_iter()
.map(SegmentEntry::new)
.collect::<Vec<_>>();
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.0.segment_manager.commit(segment_metas);
let mut directory = segment_updater.0.index.directory().box_clone();
save_metas(
segment_updater.0.segment_manager.committed_segment_metas(),
@@ -241,13 +236,15 @@ impl SegmentUpdater {
.map(|segment_meta| index.segment(segment_meta))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
let mut merged_segment = index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_num_docs(num_docs);
@@ -257,7 +254,6 @@ impl SegmentUpdater {
.end_merge(segment_metas.clone(), segment_entry.clone())
.wait()
.unwrap();
merging_future_send.complete(segment_entry.clone());
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(segment_entry)

View File

@@ -55,6 +55,10 @@ extern crate libc;
#[cfg(test)] extern crate test;
#[cfg(test)] extern crate rand;
#[cfg(test)]
mod functional_test;
#[macro_use]
mod macros {
macro_rules! get(
@@ -185,8 +189,10 @@ mod tests {
use Index;
use core::SegmentReader;
use query::BooleanQuery;
use postings::SegmentPostingsOption;
use schema::*;
use DocSet;
use IndexWriter;
use Postings;
#[test]
@@ -290,7 +296,7 @@ mod tests {
#[test]
fn test_delete_postings() {
fn test_delete_postings1() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
@@ -392,10 +398,8 @@ mod tests {
{
index_writer.delete_term(Term::from_field_text(text_field, "c"));
}
index_writer.rollback().unwrap();
{
index_writer.delete_term(Term::from_field_text(text_field, "a"));
}
index_writer.rollback().unwrap();
index_writer.delete_term(Term::from_field_text(text_field, "a"));
index_writer.commit().unwrap();
}
{
@@ -425,6 +429,63 @@ mod tests {
}
#[test]
fn test_indexed_u32() {
let mut schema_builder = SchemaBuilder::default();
let field = schema_builder.add_u32_field("text", U32_INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer.add_document(
doc!(field=>1)
);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
let term = Term::from_field_u32(field, 1u32);
let mut postings = searcher.segment_reader(0).read_postings(&term, SegmentPostingsOption::NoFreq).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 0);
assert!(!postings.advance());
}
#[test]
fn test_delete_postings2() {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
// writing the segment
let mut index_writer = index.writer_with_num_threads(2, 40_000_000).unwrap();
let add_document = |index_writer: &mut IndexWriter, val: &'static str| {
let doc = doc!(text_field=>val);
index_writer.add_document(doc);
};
let remove_document = |index_writer: &mut IndexWriter, val: &'static str| {
let delterm = Term::from_field_text(text_field, val);
index_writer.delete_term(delterm);
};
add_document(&mut index_writer, "63");
add_document(&mut index_writer, "70");
add_document(&mut index_writer, "34");
add_document(&mut index_writer, "1");
add_document(&mut index_writer, "38");
add_document(&mut index_writer, "33");
add_document(&mut index_writer, "40");
add_document(&mut index_writer, "17");
remove_document(&mut index_writer, "38");
remove_document(&mut index_writer, "34");
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
assert_eq!(searcher.num_docs(), 6);
}
#[test]
fn test_termfreq() {
let mut schema_builder = SchemaBuilder::default();