issue/43 simplification. removed the notion of delete cursor.

This commit is contained in:
Paul Masurel
2017-02-09 21:49:25 +09:00
parent 72afbb28c7
commit d007cf3435
12 changed files with 210 additions and 279 deletions

View File

@@ -231,7 +231,7 @@ impl Index {
Ok(load_metas(self.directory())?
.segments
.iter()
.map(|segment_meta| segment_meta.segment_id)
.map(|segment_meta| segment_meta.id())
.collect())
}

View File

@@ -52,7 +52,7 @@ impl Segment {
/// Returns the segment's id.
pub fn id(&self,) -> SegmentId {
self.meta.segment_id
self.meta.id()
}
/// Returns the relative path of a component of our segment.

View File

@@ -9,7 +9,7 @@ struct DeleteMeta {
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
pub struct SegmentMeta {
pub segment_id: SegmentId,
segment_id: SegmentId,
num_docs: u32,
deletes: Option<DeleteMeta>,
}

View File

@@ -237,6 +237,10 @@ impl SegmentReader {
/// Returns the posting list associated with a term.
///
/// If the term is not found, return None.
/// Even when non-null, because of deletes, the posting object
/// returned by this method may contain no documents.
pub fn read_postings_all_info(&self, term: &Term) -> Option<SegmentPostings> {
let field_entry = self.schema.get_field_entry(term.field());
let segment_posting_option = match *field_entry.field_type() {

View File

@@ -111,7 +111,7 @@ mod tests {
}
}
fn test_delete(directory: &mut Directory) {
fn test_directory_delete(directory: &mut Directory) {
assert!(directory.open_read(*TEST_PATH).is_err());
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[1, 2, 3, 4]).unwrap();
@@ -131,7 +131,7 @@ mod tests {
test_seek(directory);
test_rewrite_forbidden(directory);
test_write_create_the_file(directory);
test_delete(directory);
test_directory_delete(directory);
}
}

View File

@@ -1,175 +1,26 @@
use schema::Term;
use std::sync::{Arc, RwLock};
use super::operation::DeleteOperation;
const BLOCK_SIZE: usize = 128;
/// DeleteQueue are implemented as an unrolled linked list.
/// Block implements a block of this unrolled linked list.
struct Block {
operations: Vec<DeleteOperation>,
next: Option<SharedBlock>,
}
impl Default for Block {
fn default() -> Block {
Block {
operations: Vec::with_capacity(BLOCK_SIZE),
next: None
}
}
}
/// A shared block wraps a block
// TODO remove clone
#[derive(Clone)]
struct SharedBlock(Arc<RwLock<Block>>);
impl SharedBlock {
// Happens a new element to the block and return
// what the new head is.
fn enqueue(&self, delete_operation: DeleteOperation) -> Option<SharedBlock> {
let mut writable_block = self.0.write().expect("Panicked while enqueueing in the delete queue.");
if writable_block.operations.len() >= BLOCK_SIZE {
let next_block = SharedBlock::default();
next_block.enqueue(delete_operation);
writable_block.next = Some(next_block.clone());
Some(next_block)
}
else {
writable_block.operations.push(delete_operation);
None
}
}
fn next_block(&self) -> Option<SharedBlock> {
self.0
.read()
.unwrap()
.next
.clone()
}
fn cursor(&self,) -> DeleteQueueCursor {
let len = self.0
.read()
.expect("Panicked while reading a block in the delete queue.")
.operations
.len();
DeleteQueueCursor {
block: self.clone(),
pos: len,
}
}
}
impl Default for SharedBlock {
fn default() -> SharedBlock {
SharedBlock(Arc::default())
}
}
impl Default for DeleteQueue {
fn default() -> DeleteQueue {
DeleteQueue {
writing_head: SharedBlock::default(),
}
}
}
#[derive(Clone)]
pub struct DeleteQueueCursor {
block: SharedBlock,
pos: usize,
}
impl DeleteQueueCursor {
/// Skips to the first delete operation which has
/// a timestamp that is greater or equal to opstamp.
///
/// Returns false in the DeleteQueue reaches its end before
/// meeting such an element.
pub fn skip_to(&mut self, opstamp: u64) -> bool {
// TODO optimize
while let Some(delete_operation) = self.peek() {
if delete_operation.opstamp >= opstamp {
return true;
}
else {
self.next();
}
}
return false;
}
pub fn peek(&mut self) -> Option<DeleteOperation> {
if self.pos >= BLOCK_SIZE {
self.pos = 0;
match self.block.next_block() {
Some(next_block) => {
self.block = next_block;
self.pos = 0;
}
None => {
// there is no next block.
return None;
}
}
}
let readable_block = self.block.0
.read()
.unwrap();
if self.pos >= readable_block.operations.len() {
None
}
else {
Some(readable_block.operations[self.pos].clone())
}
}
}
impl Iterator for DeleteQueueCursor {
type Item = DeleteOperation;
/// Returns a delete operation if an operation is available,
/// None if the queue is empty.
///
/// This iterator may return None once, and return
/// `Some(...)` ulteriorily.
fn next(&mut self) -> Option<DeleteOperation> {
let delete_position = self.peek();
if delete_position.is_some() {
self.pos += 1;
}
delete_position
}
}
// ----------------------------------------
pub struct DeleteQueue {
writing_head: SharedBlock,
delete_operations: Vec<DeleteOperation>,
}
impl DeleteQueue {
pub fn cursor(&self) -> DeleteQueueCursor {
self.writing_head.cursor()
pub fn new() -> DeleteQueue {
DeleteQueue {
delete_operations: vec!(),
}
}
pub fn push_op(&mut self, delete_operation: DeleteOperation) {
if let Some(new_head) = self.writing_head.enqueue(delete_operation) {
self.writing_head = new_head;
}
self.delete_operations.push(delete_operation);
}
pub fn push(&mut self, opstamp: u64, term: Term) {
let delete_operation = DeleteOperation {
opstamp: opstamp,
term: term,
};
self.push_op(delete_operation);
pub fn operations(&self,) -> impl Iterator<Item=DeleteOperation> {
// TODO fix iterator
self.delete_operations.clone().into_iter()
}
}
@@ -183,7 +34,7 @@ mod tests {
#[test]
fn test_deletequeue() {
let mut delete_queue = DeleteQueue::default();
let mut delete_queue = DeleteQueue::new();
let make_op = |i: usize| {
let field = Field(1u8);
@@ -196,36 +47,38 @@ mod tests {
delete_queue.push_op(make_op(1));
delete_queue.push_op(make_op(2));
let mut delete_cursor_3 = delete_queue.cursor();
let mut delete_cursor_3_b = delete_cursor_3.clone();
// TODO unit tests
// let mut delete_cursor_3 = delete_queue.cursor();
// let mut delete_cursor_3_b = delete_cursor_3.clone();
assert!(delete_cursor_3.next().is_none());
assert!(delete_cursor_3.peek().is_none());
// assert!(delete_cursor_3.next().is_none());
// assert!(delete_cursor_3.peek().is_none());
delete_queue.push_op(make_op(3));
delete_queue.push_op(make_op(4));
// delete_queue.push_op(make_op(3));
// delete_queue.push_op(make_op(4));
assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3)));
let mut delete_cursor_3_c = delete_cursor_3_b.clone();
// assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3)));
// let mut delete_cursor_3_c = delete_cursor_3_b.clone();
assert_eq!(delete_cursor_3_b.next(), Some(make_op(3)));
let mut delete_cursor_4 = delete_cursor_3_b.clone();
// assert_eq!(delete_cursor_3_b.next(), Some(make_op(3)));
// let mut delete_cursor_4 = delete_cursor_3_b.clone();
assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4)));
assert_eq!(delete_cursor_3_b.next(), Some(make_op(4)));
// assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4)));
// assert_eq!(delete_cursor_3_b.next(), Some(make_op(4)));
assert_eq!(delete_cursor_3_c.next(), Some(make_op(3)));
// assert_eq!(delete_cursor_3_c.next(), Some(make_op(3)));
assert!(delete_cursor_3_b.next().is_none());
assert_eq!(delete_cursor_3_c.next(), Some(make_op(4)));
assert!(delete_cursor_3_c.next().is_none());
// assert!(delete_cursor_3_b.next().is_none());
// assert_eq!(delete_cursor_3_c.next(), Some(make_op(4)));
// assert!(delete_cursor_3_c.next().is_none());
assert_eq!(delete_cursor_3.peek(), Some(make_op(3)));
assert_eq!(delete_cursor_3.next(), Some(make_op(3)));
assert!(delete_cursor_3_b.next().is_none());
// assert_eq!(delete_cursor_3.peek(), Some(make_op(3)));
// assert_eq!(delete_cursor_3.next(), Some(make_op(3)));
// assert!(delete_cursor_3_b.next().is_none());
assert_eq!(delete_cursor_4.next(), Some(make_op(4)));
assert!(delete_cursor_4.next().is_none());
// assert_eq!(delete_cursor_4.next(), Some(make_op(4)));
// assert!(delete_cursor_4.next().is_none());
}

View File

@@ -3,7 +3,10 @@ use schema::Document;
use super::operation::AddOperation;
use core::Index;
use core::Segment;
use core::SegmentMeta;
use std::sync::Arc;
use core::SegmentId;
use indexer::operation::DeleteOperation;
use schema::Term;
use indexer::SegmentEntry;
use std::thread::JoinHandle;
@@ -18,15 +21,15 @@ use core::SegmentComponent;
use super::directory_lock::DirectoryLock;
use futures::Future;
use std::clone::Clone;
use indexer::delete_queue::DeleteQueue;
use std::io;
use std::thread;
use futures::Canceled;
use std::mem;
use datastruct::stacker::Heap;
use core::SegmentReader;
use std::mem::swap;
use std::mem::swap;
use chan;
use super::delete_queue::{DeleteQueue, DeleteQueueCursor};
use super::segment_updater::SegmentUpdater;
use Result;
use Error;
@@ -88,11 +91,19 @@ impl !Send for IndexWriter {}
impl !Sync for IndexWriter {}
// TODO move doc to opstamp mapping to its own file
#[derive(Clone)]
pub enum DocToOpstampMapping {
WithMap(Vec<u64>),
WithMap(Arc<Vec<u64>>),
None
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
impl DocToOpstampMapping {
fn compute_doc_limit(&self, opstamp: u64) -> DocId {
match *self {
@@ -112,13 +123,14 @@ impl DocToOpstampMapping {
/// work on SegmentMeta
pub fn advance_deletes(
segment: &mut Segment,
delete_cursor: &mut DeleteQueueCursor,
doc_opstamps: DocToOpstampMapping) -> Result<SegmentEntry> {
delete_queue: &DeleteQueue,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentEntry> {
let segment_reader = SegmentReader::open(segment.clone())?;
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let mut last_opstamp_opt: Option<u64> = None;
while let Some(delete_op) = delete_cursor.next() {
for delete_op in delete_queue.operations() {
// A delete operation should only affect
// document that were inserted after it.
//
@@ -147,9 +159,7 @@ pub fn advance_deletes(
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
Ok(SegmentEntry::new(segment.meta().clone(), delete_cursor.clone()))
Ok(SegmentEntry::new(segment.meta().clone()))
}
fn index_documents(heap: &mut Heap,
@@ -157,8 +167,7 @@ fn index_documents(heap: &mut Heap,
schema: &Schema,
generation: usize,
document_iterator: &mut Iterator<Item=AddOperation>,
segment_updater: &mut SegmentUpdater,
delete_cursor: &mut DeleteQueueCursor)
segment_updater: &mut SegmentUpdater)
-> Result<bool> {
heap.clear();
let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment.clone(), &schema));
@@ -182,7 +191,10 @@ fn index_documents(heap: &mut Heap,
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let segment_entry = advance_deletes(&mut segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?;
// let segment_entry = advance_deletes(&mut segment, delete_queue, delete_position, )?;
let mut segment_entry = SegmentEntry::new(SegmentMeta::new(segment.id()));
segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps));
segment_updater
.add_segment(generation, segment_entry)
@@ -230,8 +242,6 @@ impl IndexWriter {
// TODO fix this. the cursor might be too advanced
// at this point.
let delete_cursor = self.delete_queue.cursor();
let generation = self.generation;
let join_handle: JoinHandle<Result<()>> =
@@ -239,7 +249,6 @@ impl IndexWriter {
.name(format!("indexing thread {} for gen {}", self.worker_id, generation))
.spawn(move || {
let mut delete_cursor_clone = delete_cursor.clone();
loop {
let mut document_iterator = document_receiver_clone.clone()
.into_iter()
@@ -259,8 +268,7 @@ impl IndexWriter {
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone)?;
&mut segment_updater)?;
}
else {
// No more documents.
@@ -308,9 +316,9 @@ impl IndexWriter {
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::default();
let delete_queue = DeleteQueue::new();
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?;
let segment_updater = SegmentUpdater::new(index.clone())?;
let mut index_writer = IndexWriter {
@@ -429,6 +437,8 @@ impl IndexWriter {
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
self.delete_queue = DeleteQueue::new();
// reset the opstamp
self.uncommitted_opstamp = self.committed_opstamp;
Ok(self.committed_opstamp)
@@ -478,9 +488,10 @@ impl IndexWriter {
// committed segments.
self.committed_opstamp = self.stamp();
let new_delete_queue = DeleteQueue::default();
let new_delete_queue = DeleteQueue::new();
let future = self.segment_updater.commit(self.committed_opstamp, new_delete_queue.cursor());
// TODO remove clone
let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp);
// wait for the segment update thread to have processed the info
// TODO remove unwrap
@@ -493,7 +504,11 @@ impl IndexWriter {
pub fn delete_term(&mut self, term: Term) {
let opstamp = self.stamp();
self.delete_queue.push(opstamp, term);
let delete_operation = DeleteOperation {
opstamp: opstamp,
term: term,
};
self.delete_queue.push_op(delete_operation);
}
fn stamp(&mut self) -> u64 {

View File

@@ -79,7 +79,7 @@ impl MergePolicy for LogMergePolicy {
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter()
.map(|&ind| segments[ind].segment_id)
.map(|&ind| segments[ind].id())
.collect())
})
.collect()

View File

@@ -3,7 +3,6 @@ use std::sync::RwLock;
use core::SegmentMeta;
use core::SegmentId;
use indexer::SegmentEntry;
use indexer::delete_queue::DeleteQueueCursor;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
@@ -52,11 +51,11 @@ pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec
impl SegmentManager {
pub fn from_segments(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteQueueCursor) -> SegmentManager {
pub fn from_segments(segment_metas: Vec<SegmentMeta>) -> SegmentManager {
SegmentManager {
registers: RwLock::new( SegmentRegisters {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
committed: SegmentRegister::new(segment_metas),
}),
}
}
@@ -131,7 +130,7 @@ impl SegmentManager {
pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
let merged_segment_ids: Vec<SegmentId> = merged_segment_metas.iter().map(|meta| meta.segment_id).collect();
let merged_segment_ids: Vec<SegmentId> = merged_segment_metas.iter().map(|meta| meta.id()).collect();
if registers_lock.uncommitted.contains_all(&merged_segment_ids) {
for segment_id in &merged_segment_ids {
registers_lock.uncommitted.remove_segment(segment_id);

View File

@@ -1,9 +1,10 @@
use core::SegmentId;
use std::collections::HashMap;
use core::SegmentMeta;
use indexer::index_writer::DocToOpstampMapping;
use std::fmt;
use std::fmt::{Debug, Formatter};
use indexer::delete_queue::DeleteQueueCursor;
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum SegmentState {
@@ -24,17 +25,29 @@ impl SegmentState {
pub struct SegmentEntry {
meta: SegmentMeta,
state: SegmentState,
delete_cursor: DeleteQueueCursor,
doc_to_opstamp: DocToOpstampMapping,
}
impl SegmentEntry {
pub fn segment_id(&self) -> SegmentId {
self.meta.segment_id
pub fn new(segment_meta: SegmentMeta) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
doc_to_opstamp: DocToOpstampMapping::None,
}
}
pub fn delete_cursor(&mut self) -> &mut DeleteQueueCursor {
&mut self.delete_cursor
pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping {
&self.doc_to_opstamp
}
pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) {
self.doc_to_opstamp = doc_to_opstamp;
}
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
}
pub fn meta(&self) -> &SegmentMeta {
@@ -48,15 +61,6 @@ impl SegmentEntry {
fn is_ready(&self,) -> bool {
self.state == SegmentState::Ready
}
pub fn new(segment_meta: SegmentMeta,
delete_cursor: DeleteQueueCursor) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
delete_cursor: delete_cursor,
}
}
}
impl Debug for SegmentEntry {
@@ -117,14 +121,14 @@ impl SegmentRegister {
.values()
.map(|segment_entry| segment_entry.meta.clone())
.collect();
segment_ids.sort_by_key(|meta| meta.segment_id);
segment_ids.sort_by_key(|meta| meta.id());
segment_ids
}
pub fn segment_ids(&self,) -> Vec<SegmentId> {
self.segment_metas()
.into_iter()
.map(|segment_meta| segment_meta.segment_id)
.map(|segment_meta| segment_meta.id())
.collect()
}
@@ -141,7 +145,7 @@ impl SegmentRegister {
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
let segment_id = segment_entry.meta.segment_id;
let segment_id = segment_entry.meta.id();
self.segment_states.insert(segment_id, segment_entry);
}
@@ -156,13 +160,13 @@ impl SegmentRegister {
.start_merge();
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteQueueCursor) -> SegmentRegister {
pub fn new(segment_metas: Vec<SegmentMeta>) -> SegmentRegister {
SegmentRegister {
segment_states: segment_metas
.into_iter()
.map(|segment_meta| {
let segment_id = segment_meta.segment_id;
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta );
(segment_id, segment_entry)
})
.collect(),
@@ -180,15 +184,12 @@ impl Default for SegmentRegister {
#[cfg(test)]
mod tests {
use core::SegmentId;
use core::SegmentMeta;
use indexer::delete_queue::DeleteQueue;
use super::*;
#[test]
fn test_segment_register() {
let delete_queue = DeleteQueue::default();
let mut segment_register = SegmentRegister::default();
let segment_id_a = SegmentId::generate_random();
let segment_id_b = SegmentId::generate_random();
@@ -196,14 +197,14 @@ mod tests {
{
let segment_meta = SegmentMeta::new(segment_id_a);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
let segment_entry = SegmentEntry::new(segment_meta);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready);
assert_eq!(segment_register.segment_ids(), vec!(segment_id_a));
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
let segment_entry = SegmentEntry::new(segment_meta);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::Ready);
@@ -215,7 +216,7 @@ mod tests {
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor());
let segment_entry = SegmentEntry::new(segment_meta_merged);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged));

View File

@@ -22,13 +22,13 @@ use std::borrow::BorrowMut;
use indexer::SegmentSerializer;
use indexer::SegmentEntry;
use schema::Schema;
use indexer::index_writer::{advance_deletes, DocToOpstampMapping};
use indexer::index_writer::advance_deletes;
use directory::Directory;
use std::thread::JoinHandle;
use std::sync::Arc;
use std::collections::HashMap;
use rustc_serialize::json;
use indexer::delete_queue::{DeleteQueueCursor, DeleteQueue};
use indexer::delete_queue::DeleteQueue;
use Result;
use futures_cpupool::CpuPool;
use core::IndexMeta;
@@ -37,13 +37,6 @@ use std::io::Write;
use super::segment_manager::{SegmentManager, get_segments};
fn create_metas(metas: Vec<SegmentMeta>, schema: Schema, opstamp: u64) -> IndexMeta {
IndexMeta {
segments: metas,
schema: schema,
opstamp: opstamp,
}
}
/// Save the index meta file.
@@ -78,7 +71,11 @@ pub fn save_metas(segment_metas: Vec<SegmentMeta>,
opstamp: u64,
directory: &mut Directory)
-> Result<()> {
let metas = create_metas(segment_metas, schema, opstamp);
let metas = IndexMeta {
segments: segment_metas,
schema: schema,
opstamp: opstamp,
};
let mut w = Vec::new();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
Ok(directory
@@ -109,13 +106,10 @@ struct InnerSegmentUpdater {
impl SegmentUpdater {
pub fn new(
index: Index,
delete_cursor: DeleteQueueCursor)
-> Result<SegmentUpdater>
pub fn new(index: Index) -> Result<SegmentUpdater>
{
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let segment_manager = SegmentManager::from_segments(segments);
Ok(
SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
@@ -149,7 +143,7 @@ impl SegmentUpdater {
})
}
pub fn new_generation(&mut self, generation: usize) -> impl Future<Item=(), Error=&'static str> {
pub fn rollback(&mut self, generation: usize) -> impl Future<Item=(), Error=&'static str> {
self.0.generation.store(generation, Ordering::Release);
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.rollback();
@@ -169,26 +163,24 @@ impl SegmentUpdater {
}
}
fn purge_deletes(&self) -> Result<Vec<SegmentMeta>> {
let segment_entries = self.0.segment_manager.segment_entries();
segment_entries
fn purge_deletes(&self, delete_queue: &DeleteQueue) -> Result<Vec<SegmentMeta>> {
self.0.segment_manager
.segment_entries()
.into_iter()
.map(|mut segment_entry| {
.map(|segment_entry| {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(&mut segment, segment_entry.delete_cursor(), DocToOpstampMapping::None)
advance_deletes(&mut segment, delete_queue, segment_entry.doc_to_opstamp())
.map(|entry| entry.meta().clone())
})
.collect()
}
pub fn commit(&self, opstamp: u64, new_delete_queue: DeleteQueueCursor) -> impl Future<Item=(), Error=&'static str> {
pub fn commit(&self, delete_queue: DeleteQueue, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes");
let segment_entries = segment_metas.into_iter()
.map(|segment_meta|
SegmentEntry::new(segment_meta, new_delete_queue.clone())
)
let segment_metas = segment_updater.purge_deletes(&delete_queue).expect("Failed purge deletes");
let segment_entries = segment_metas
.into_iter()
.map(SegmentEntry::new)
.collect::<Vec<_>>();
segment_updater.0.segment_manager.commit(segment_entries);
let mut directory = segment_updater.0.index.directory().box_clone();
@@ -241,8 +233,6 @@ impl SegmentUpdater {
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
let mut merged_segment = index.new_segment();
// ... we just serialize this index merger in our new segment
@@ -252,10 +242,7 @@ impl SegmentUpdater {
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_num_docs(num_docs);
// TODO fix delete cursor
let delete_queue = DeleteQueue::default();
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
let segment_entry = SegmentEntry::new(segment_meta);
segment_updater_clone
.end_merge(segment_metas.clone(), segment_entry.clone())
.wait()
@@ -297,7 +284,7 @@ impl SegmentUpdater {
segment_updater.0.index.opstamp(),
directory.borrow_mut()).expect("Could not save metas.");
for segment_meta in merged_segment_metas {
segment_updater.0.index.delete_segment(segment_meta.segment_id);
segment_updater.0.index.delete_segment(segment_meta.id());
}
})

View File

@@ -352,6 +352,78 @@ mod tests {
assert!(!postings.advance());
}
}
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{ // 0
let doc = doc!(text_field=>"a b");
index_writer.add_document(doc).unwrap();
}
{ // 1
index_writer.delete_term(Term::from_field_text(text_field, "c"));
}
index_writer.rollback().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none());
{
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 5);
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
assert_eq!(postings.doc(), 4);
assert!(!postings.advance());
}
}
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let doc = doc!(text_field=>"a b");
index_writer.add_document(doc).unwrap();
}
{
index_writer.delete_term(Term::from_field_text(text_field, "c"));
}
index_writer.rollback().unwrap();
{
index_writer.delete_term(Term::from_field_text(text_field, "a"));
}
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none());
{
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap();
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 3);
assert!(postings.advance());
assert_eq!(postings.doc(), 4);
assert!(!postings.advance());
}
{
let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "c")).unwrap();
assert!(postings.advance());
assert_eq!(postings.doc(), 4);
assert!(!postings.advance());
}
}
}