issue/43 test passing

This commit is contained in:
Paul Masurel
2017-02-08 17:45:14 +09:00
parent 2fc3a505bc
commit 72afbb28c7
10 changed files with 72 additions and 99 deletions

View File

@@ -21,7 +21,7 @@ use core::IndexMeta;
use core::META_FILEPATH;
use super::segment::create_segment;
use indexer::segment_updater::save_new_metas;
use directory::error::{FileError, OpenWriteError};
use directory::error::FileError;
const NUM_SEARCHERS: usize = 12;
@@ -185,7 +185,7 @@ impl Index {
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
let metas = load_metas(self.directory())?;
Ok(metas
.committed_segments
.segments
.into_iter()
.map(|segment_meta| self.segment(segment_meta))
.collect())
@@ -221,15 +221,15 @@ impl Index {
}
/// Reads the meta.json and returns the list of
/// committed segments.
pub fn committed_segments(&self) -> Result<Vec<SegmentMeta>> {
Ok(load_metas(self.directory())?.committed_segments)
/// segments in the last commit.
pub fn segments(&self) -> Result<Vec<SegmentMeta>> {
Ok(load_metas(self.directory())?.segments)
}
/// Returns the list of segment ids that are searchable.
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
Ok(load_metas(self.directory())?
.committed_segments
.segments
.iter()
.map(|segment_meta| segment_meta.segment_id)
.collect())

View File

@@ -11,8 +11,7 @@ use core::SegmentMeta;
///
#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct IndexMeta {
pub committed_segments: Vec<SegmentMeta>,
pub uncommitted_segments: Vec<SegmentMeta>,
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: u64,
}
@@ -20,8 +19,7 @@ pub struct IndexMeta {
impl IndexMeta {
pub fn with_schema(schema: Schema) -> IndexMeta {
IndexMeta {
committed_segments: Vec::new(),
uncommitted_segments: Vec::new(),
segments: vec!(),
schema: schema,
opstamp: 0u64,
}

View File

@@ -1,11 +1,8 @@
use uuid::Uuid;
use std::fmt;
use rustc_serialize::{Encoder, Decoder, Encodable, Decodable};
use core::SegmentComponent;
use std::path::PathBuf;
use std::cmp::{Ordering, Ord};
#[cfg(test)]
use std::sync::atomic;

View File

@@ -3,13 +3,11 @@ use core::Segment;
use core::SegmentId;
use core::SegmentComponent;
use schema::Term;
use bit_set::BitSet;
use common::HasLen;
use fastfield::delete::DeleteBitSet;
use store::StoreReader;
use schema::Document;
use directory::ReadOnlySource;
use directory::error::FileError;
use DocId;
use std::io;
use std::str;

View File

@@ -19,7 +19,6 @@ use super::directory_lock::DirectoryLock;
use futures::Future;
use std::clone::Clone;
use std::io;
use fastfield::delete;
use std::thread;
use futures::Canceled;
use std::mem;
@@ -27,7 +26,6 @@ use datastruct::stacker::Heap;
use core::SegmentReader;
use std::mem::swap;
use chan;
use core::SegmentMeta;
use super::delete_queue::{DeleteQueue, DeleteQueueCursor};
use super::segment_updater::SegmentUpdater;
use Result;
@@ -110,18 +108,17 @@ impl DocToOpstampMapping {
}
/// TODO
/// work on SegmentMeta
pub fn advance_deletes(
segment: &Segment,
segment: &mut Segment,
delete_cursor: &mut DeleteQueueCursor,
doc_opstamps: DocToOpstampMapping) -> Result<Option<(u64, BitSet)>> {
doc_opstamps: DocToOpstampMapping) -> Result<SegmentEntry> {
let segment_reader = SegmentReader::open(segment.clone())?;
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let mut last_opstamp_opt: Option<u64> = None;
for delete_op in delete_cursor {
while let Some(delete_op) = delete_cursor.next() {
// A delete operation should only affect
// document that were inserted after it.
//
@@ -145,11 +142,14 @@ pub fn advance_deletes(
delete_bitset.insert(doc as usize);
}
}
Ok(Some((last_opstamp, delete_bitset)))
}
else {
Ok(None)
let num_deleted_docs = delete_bitset.len();
segment.meta_mut().set_deletes(num_deleted_docs as u32, last_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
Ok(SegmentEntry::new(segment.meta().clone(), delete_cursor.clone()))
}
fn index_documents(heap: &mut Heap,
@@ -161,7 +161,6 @@ fn index_documents(heap: &mut Heap,
delete_cursor: &mut DeleteQueueCursor)
-> Result<bool> {
heap.clear();
let segment_id = segment.id();
let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment.clone(), &schema));
for doc in document_iterator {
try!(segment_writer.add_document(&doc, &schema));
@@ -180,19 +179,10 @@ fn index_documents(heap: &mut Heap,
segment
.meta_mut()
.set_num_docs(num_docs);
let last_opstamp = segment_writer.last_opstamp();
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
if let Some((last_opstamp_after_deletes, deleted_docset)) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))? {
let num_deleted_docs = deleted_docset.len();
segment.meta_mut().set_deletes(num_deleted_docs as u32, last_opstamp_after_deletes);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&deleted_docset, &mut delete_file)?;
}
let segment_entry = SegmentEntry::new(segment.meta().clone(), delete_cursor.clone());
let segment_entry = advance_deletes(&mut segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?;
segment_updater
.add_segment(generation, segment_entry)
@@ -262,9 +252,15 @@ impl IndexWriter {
// this is a valid guarantee as the
// peeked document now belongs to
// our local iterator.
let opstamp: u64;
if let Some(operation) = document_iterator.peek() {
opstamp = operation.opstamp;
if document_iterator.peek().is_some() {
let segment = index.new_segment();
index_documents(&mut heap,
segment,
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone)?;
}
else {
// No more documents.
@@ -273,14 +269,7 @@ impl IndexWriter {
return Ok(())
}
let segment = index.new_segment();
let valid_generation = index_documents(&mut heap,
segment,
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone)?;
}
})?;
self.worker_id += 1;
@@ -402,7 +391,9 @@ impl IndexWriter {
// by updating the generation in the segment updater,
// pending add segment commands will be dismissed.
self.generation += 1;
let rollback_future = self.segment_updater.new_generation(self.generation);
// TODO requires a new delete queue...
let rollback_future = self.segment_updater.rollback(self.generation);
// we cannot drop segment ready receiver yet
// as it would block the workers.
@@ -487,12 +478,15 @@ impl IndexWriter {
// committed segments.
self.committed_opstamp = self.stamp();
let future = self.segment_updater.commit(self.committed_opstamp);
let new_delete_queue = DeleteQueue::default();
let future = self.segment_updater.commit(self.committed_opstamp, new_delete_queue.cursor());
// wait for the segment update thread to have processed the info
// TODO remove unwrap
future.wait().unwrap();
self.delete_queue = new_delete_queue;
Ok(self.committed_opstamp)
}

View File

@@ -101,13 +101,13 @@ impl SegmentManager {
segment_ids
}
pub fn commit(&self) {
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
let segment_entries = registers_lock.uncommitted.segment_entries();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
registers_lock.uncommitted.clear();
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) {
@@ -148,9 +148,9 @@ impl SegmentManager {
}
}
pub fn segment_metas(&self,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
pub fn committed_segment_metas(&self,) -> Vec<SegmentMeta> {
let registers_lock = self.read();
(registers_lock.committed.segment_metas(), registers_lock.uncommitted.segment_metas())
registers_lock.committed.segment_metas()
}
}

View File

@@ -105,7 +105,7 @@ impl SegmentRegister {
.collect()
}
pub fn segment_entries(&self,) -> Vec<SegmentEntry>{
pub fn segment_entries(&self,) -> Vec<SegmentEntry> {
self.segment_states
.values()
.cloned()

View File

@@ -10,11 +10,9 @@ use std::mem;
use std::sync::atomic::Ordering;
use std::ops::DerefMut;
use futures::{Future, future};
use fastfield::delete::write_delete_bitset;
use futures::oneshot;
use futures::Canceled;
use std::thread;
use core::SegmentComponent;
use std::sync::atomic::AtomicUsize;
use std::sync::RwLock;
use core::SerializableSegment;
@@ -39,11 +37,9 @@ use std::io::Write;
use super::segment_manager::{SegmentManager, get_segments};
fn create_metas(segment_manager: &SegmentManager, schema: Schema, opstamp: u64) -> IndexMeta {
let (committed_segments, uncommitted_segments) = segment_manager.segment_metas();
fn create_metas(metas: Vec<SegmentMeta>, schema: Schema, opstamp: u64) -> IndexMeta {
IndexMeta {
committed_segments: committed_segments,
uncommitted_segments: uncommitted_segments,
segments: metas,
schema: schema,
opstamp: opstamp,
}
@@ -63,8 +59,7 @@ pub fn save_new_metas(schema: Schema,
opstamp: u64,
directory: &mut Directory)
-> Result<()> {
let segment_manager = SegmentManager::default();
save_metas(&segment_manager, schema, opstamp, directory)
save_metas(vec!(), schema, opstamp, directory)
}
@@ -78,12 +73,12 @@ pub fn save_new_metas(schema: Schema,
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_metas(segment_manager: &SegmentManager,
pub fn save_metas(segment_metas: Vec<SegmentMeta>,
schema: Schema,
opstamp: u64,
directory: &mut Directory)
-> Result<()> {
let metas = create_metas(segment_manager, schema, opstamp);
let metas = create_metas(segment_metas, schema, opstamp);
let mut w = Vec::new();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
Ok(directory
@@ -119,8 +114,8 @@ impl SegmentUpdater {
delete_cursor: DeleteQueueCursor)
-> Result<SegmentUpdater>
{
let committed_segments = index.committed_segments()?;
let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor);
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
Ok(
SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
@@ -174,33 +169,31 @@ impl SegmentUpdater {
}
}
fn purge_deletes(&self, target_opstamp: u64) -> Result<()> {
let uncommitted = self.0.segment_manager.segment_entries();
for mut segment_entry in uncommitted {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
if let Some((_, deleted_docset)) = advance_deletes(
&segment,
segment_entry.delete_cursor(),
DocToOpstampMapping::None).unwrap()
{
let num_deleted_docs = deleted_docset.len();
// TODO previous mask?
// TODO save the resulting segment_entry
segment.meta_mut().set_deletes(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&deleted_docset, &mut delete_file)?;
}
}
Ok(())
fn purge_deletes(&self) -> Result<Vec<SegmentMeta>> {
let segment_entries = self.0.segment_manager.segment_entries();
segment_entries
.into_iter()
.map(|mut segment_entry| {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(&mut segment, segment_entry.delete_cursor(), DocToOpstampMapping::None)
.map(|entry| entry.meta().clone())
})
.collect()
}
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
pub fn commit(&self, opstamp: u64, new_delete_queue: DeleteQueueCursor) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
segment_updater.purge_deletes(opstamp).expect("Failed purge deletes");
segment_updater.0.segment_manager.commit();
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
let segment_entries = segment_metas.into_iter()
.map(|segment_meta|
SegmentEntry::new(segment_meta, new_delete_queue.clone())
)
.collect::<Vec<_>>();
segment_updater.0.segment_manager.commit(segment_entries);
let mut directory = segment_updater.0.index.directory().box_clone();
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.segment_manager.committed_segment_metas(),
segment_updater.0.index.schema(),
opstamp,
directory.borrow_mut()).expect("Could not save metas.");
@@ -297,8 +290,9 @@ impl SegmentUpdater {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry);
let mut directory = segment_updater.0.index.directory().box_clone();
let segment_metas = segment_updater.0.segment_manager.committed_segment_metas();
save_metas(
&segment_updater.0.segment_manager,
segment_metas,
segment_updater.0.index.schema(),
segment_updater.0.index.opstamp(),
directory.borrow_mut()).expect("Could not save metas.");

View File

@@ -148,12 +148,7 @@ impl<'a> SegmentWriter<'a> {
// })
// .collect::<Vec<_>>()
// }
pub fn last_opstamp(&self) -> u64 {
*(self.doc_opstamps
.last()
.expect("Last doc opstamp called on an empty segment writer"))
}
/// Indexes a new document
///

View File

@@ -329,7 +329,6 @@ mod tests {
schema_builder.add_u32_field("count", count_options);
let schema = schema_builder.build();
let schema_json: String = format!("{}", json::as_pretty_json(&schema));
println!("{}", schema_json);
let expected = r#"[
{
"name": "title",
@@ -456,7 +455,6 @@ mod tests {
"author": "fulmicoton",
"count": -5
}"#);
println!("{:?}", json_err);
match json_err {
Err(DocParsingError::ValueError(_, ValueParsingError::TypeError(_))) => {
assert!(true);
@@ -472,7 +470,6 @@ mod tests {
"author": "fulmicoton",
"count": 5000000000
}"#);
println!("{:?}", json_err);
match json_err {
Err(DocParsingError::ValueError(_, ValueParsingError::OverflowError(_))) => {
assert!(true);