diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 020db8715..2795d87e3 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -266,6 +266,10 @@ impl SegmentReader { self.segment_id } + pub fn delete_bitset(&self) -> &DeleteBitSet { + &self.delete_bitset + } + pub fn is_deleted(&self, doc: DocId) -> bool { self.delete_bitset.is_deleted(doc) } diff --git a/src/datastruct/fstmap.rs b/src/datastruct/fstmap.rs index 1d4a420ed..7bd5b23fa 100644 --- a/src/datastruct/fstmap.rs +++ b/src/datastruct/fstmap.rs @@ -30,7 +30,21 @@ impl FstMapBuilder { }) } - pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{ + /// Horribly unsafe, nobody should ever do that... except me :) + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { + try!(self.fst_builder + .insert(key, self.data.len() as u64) + .map_err(convert_fst_error)); + Ok(()) + } + + /// Horribly unsafe, nobody should ever do that... except me :) + pub fn insert_value(&mut self, value: &V) -> io::Result<()> { + try!(value.serialize(&mut self.data)); + Ok(()) + } + + pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> { try!(self.fst_builder .insert(key, self.data.len() as u64) .map_err(convert_fst_error)); diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index 03f3b13b4..a899af963 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -56,6 +56,10 @@ impl DeleteBitSet { } } + pub fn has_deletes(&self) -> bool { + self.len() > 0 + } + pub fn is_deleted(&self, doc: DocId) -> bool { if self.len == 0 { false @@ -72,7 +76,6 @@ impl DeleteBitSet { impl HasLen for DeleteBitSet { - fn len(&self) -> usize { self.len } diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 8543f7104..498bff51f 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -32,6 +32,8 @@ impl InnerDeleteQueue { } + +// TODO Rename to DeleteQueueSnapshot #[derive(Default, Clone)] pub struct ReadOnlyDeletes(Vec>>); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index b05d4f324..e2a569552 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -9,6 +9,7 @@ use core::SegmentReader; use datastruct::stacker::Heap; use Error; use fastfield::delete::write_delete_bitset; +use indexer::delete_queue::ReadOnlyDeletes; use futures::Canceled; use futures::Future; use indexer::delete_queue::DeleteQueue; @@ -84,18 +85,22 @@ pub struct IndexWriter { impl !Send for IndexWriter {} impl !Sync for IndexWriter {} -/// TODO -/// work on SegmentMeta + +// TODO put delete bitset in segment entry +// rather than DocToOpstamp. + +// TODO skip delete operation before teh +// last delete opstamp + pub fn advance_deletes( segment: &mut Segment, - delete_queue: &DeleteQueue, - doc_opstamps: &DocToOpstampMapping) -> Result { + delete_operations: &ReadOnlyDeletes, + doc_opstamps: &DocToOpstampMapping) -> Result { let segment_reader = SegmentReader::open(segment.clone())?; let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize); let mut last_opstamp_opt: Option = None; - let delete_operations = delete_queue.snapshot(); for delete_op in delete_operations.iter() { // A delete operation should only affect // document that were inserted after it. @@ -125,7 +130,7 @@ pub fn advance_deletes( let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } - Ok(SegmentEntry::new(segment.meta().clone())) + Ok(segment.meta().clone()) } fn index_documents(heap: &mut Heap, diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 15bc174f5..d0be34fda 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -5,18 +5,19 @@ use DocId; use core::SerializableSegment; use indexer::SegmentSerializer; use postings::PostingsSerializer; +use fastfield::U32FastFieldReader; +use itertools::Itertools; use postings::Postings; use postings::DocSet; use core::TermIterator; +use fastfield::delete::DeleteBitSet; use schema::{Schema, Field}; use fastfield::FastFieldSerializer; use store::StoreWriter; -use postings::ChainedPostings; -use postings::HasLen; -use postings::OffsetPostings; use core::SegmentInfo; use std::cmp::{min, max}; use std::iter; +use std::io; pub struct IndexMerger { schema: Schema, @@ -47,12 +48,40 @@ impl DeltaPositionComputer { } } + +fn compute_min_max_val(u32_reader: &U32FastFieldReader, max_doc: DocId, delete_bitset: &DeleteBitSet) -> Option<(u32, u32)> { + if max_doc == 0 { + None + } + else if !delete_bitset.has_deletes() { + // no deleted documents, + // we can use the previous min_val, max_val. + Some((u32_reader.min_val(), u32_reader.max_val())) + } + else { + // some deleted documents, + // we need to recompute the max / min + (0..max_doc) + .filter(|doc_id| !delete_bitset.is_deleted(*doc_id)) + .minmax() + .into_option() + } +} + +fn extract_fieldnorm_reader(segment_reader: &SegmentReader, field: Field) -> io::Result { + segment_reader.get_fieldnorms_reader(field) +} + +fn extract_fast_field_reader(segment_reader: &SegmentReader, field: Field) -> io::Result { + segment_reader.get_fast_field_reader(field) +} + impl IndexMerger { pub fn open(schema: Schema, segments: &[Segment]) -> Result { - let mut readers = Vec::new(); + let mut readers = vec!(); let mut max_doc = 0; for segment in segments { - let reader = try!(SegmentReader::open(segment.clone())); + let reader = SegmentReader::open(segment.clone())?; max_doc += reader.num_docs(); readers.push(reader); } @@ -63,74 +92,104 @@ impl IndexMerger { }) } - - fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { - // TODO make sure that works even if the field is never here. - for field in self.schema + fn write_fieldnorms(&self, + fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { + let fieldnorm_fastfields: Vec = self.schema .fields() .iter() .enumerate() .filter(|&(_, field_entry)| field_entry.is_indexed()) - .map(|(field_id, _)| Field(field_id as u8)) { - let mut u32_readers = Vec::new(); - let mut min_val = u32::min_value(); - let mut max_val = 0; - for reader in &self.readers { - let u32_reader = try!(reader.get_fieldnorms_reader(field)); - min_val = min(min_val, u32_reader.min_val()); - max_val = max(max_val, u32_reader.max_val()); - u32_readers.push((reader.max_doc(), u32_reader)); - } - try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val)); - for (max_doc, u32_reader) in u32_readers { - for doc_id in 0..max_doc { - let val = u32_reader.get(doc_id); - try!(fast_field_serializer.add_val(val)); - } - } - try!(fast_field_serializer.close_field()); - } - Ok(()) + .map(|(field_id, _)| Field(field_id as u8)) + .collect(); + self.generic_write_fast_field(fieldnorm_fastfields, &extract_fieldnorm_reader, fast_field_serializer) } fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { - for field in self.schema + let fast_fields: Vec = self.schema .fields() .iter() .enumerate() .filter(|&(_, field_entry)| field_entry.is_u32_fast()) - .map(|(field_id, _)| Field(field_id as u8)) { - let mut u32_readers = Vec::new(); - let mut min_val = u32::min_value(); - let mut max_val = 0; + .map(|(field_id, _)| Field(field_id as u8)) + .collect(); + self.generic_write_fast_field(fast_fields, &extract_fast_field_reader, fast_field_serializer) + } + + + // used both to merge field norms and regular u32 fast fields. + fn generic_write_fast_field(&self, + fields: Vec, + field_reader_extractor: &Fn(&SegmentReader, Field) -> io::Result, + fast_field_serializer: &mut FastFieldSerializer) -> Result<()> { + + for field in fields { + + let mut u32_readers = vec!(); + let mut min_val = u32::max_value(); + let mut max_val = u32::min_value(); + for reader in &self.readers { - let u32_reader = try!(reader.get_fast_field_reader(field)); - min_val = min(min_val, u32_reader.min_val()); - max_val = max(max_val, u32_reader.max_val()); - u32_readers.push((reader.max_doc(), u32_reader)); - } - try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val)); - for (max_doc, u32_reader) in u32_readers { - for doc_id in 0..max_doc { - let val = u32_reader.get(doc_id); - try!(fast_field_serializer.add_val(val)); + let u32_reader = field_reader_extractor(reader, field)?; + if let Some((seg_min_val, seg_max_val)) = compute_min_max_val(&u32_reader, reader.max_doc(), reader.delete_bitset()) { + // the segment has some non-deleted documents + min_val = min(min_val, seg_min_val); + max_val = max(max_val, seg_max_val); + u32_readers.push((reader.max_doc(), u32_reader, reader.delete_bitset())); } } + + if u32_readers.is_empty() { + // we have actually zero documents. + min_val = 0; + max_val = 0; + } + + assert!(min_val <= max_val); + + // TODO test deleting all documents off the index. + + try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val)); + for (max_doc, u32_reader, delete_bitset) in u32_readers { + for doc_id in 0..max_doc { + if !delete_bitset.is_deleted(doc_id) { + let val = u32_reader.get(doc_id); + try!(fast_field_serializer.add_val(val)); + } + } + } + try!(fast_field_serializer.close_field()); } Ok(()) } - fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> { + fn write_postings(&self, + + postings_serializer: &mut PostingsSerializer) -> Result<()> { + let mut merged_terms = TermIterator::from(&self.readers[..]); let mut delta_position_computer = DeltaPositionComputer::new(); - let mut offsets: Vec = Vec::new(); + + let mut max_doc = 0; - for reader in &self.readers { - offsets.push(max_doc); - max_doc += reader.max_doc(); - } + // map from segment doc ids to the resulting merged segment doc id. + let mut merged_doc_id_map: Vec>> = Vec::with_capacity(self.readers.len()); + + for reader in &self.readers { + let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize); + for doc_id in 0..reader.max_doc() { + if reader.is_deleted(doc_id) { + segment_local_map.push(None); + } + else { + segment_local_map.push(Some(max_doc)); + max_doc += 1u32; + } + } + merged_doc_id_map.push(segment_local_map); + } + while merged_terms.advance() { // Create the total list of doc ids // by stacking the doc ids from the different segment. @@ -142,34 +201,51 @@ impl IndexMerger { // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc] // ... let term = merged_terms.term(); - let mut merged_postings = - ChainedPostings::from( - merged_terms - .segment_ords() - .iter() - .cloned() - .flat_map(|segment_ord| { - let offset = offsets[segment_ord]; - self.readers[segment_ord] - .read_postings_all_info(&term) - .map(|segment_postings| OffsetPostings::new(segment_postings, offset)) - }) - .collect::>() - ); + let mut term_written = false; + let segment_postings = merged_terms + .segment_ords() + .iter() + .cloned() + .flat_map(|segment_ord| { + self.readers[segment_ord] + .read_postings_all_info(&term) + .map(|segment_postings| (segment_ord, segment_postings)) + }) + .collect::>(); - // We can now serialize this postings, by pushing each document to the - // postings serializer. - try!(postings_serializer.new_term(&term, merged_postings.len() as DocId)); - while merged_postings.advance() { - let delta_positions: &[u32] = - delta_position_computer.compute_delta_positions(merged_postings.positions()); - try!(postings_serializer.write_doc(merged_postings.doc(), - merged_postings.term_freq(), - delta_positions)); + // We can remove the term if all documents which + // contained it have been deleted. + if segment_postings.len() > 0 { + + // We can now serialize this postings, by pushing each document to the + // postings serializer. + + for (segment_ord, mut segment_postings) in segment_postings { + let old_to_new_doc_id = &merged_doc_id_map[segment_ord]; + while segment_postings.advance() { + if let Some(remapped_doc_id) = old_to_new_doc_id[segment_postings.doc() as usize] { + if !term_written { + // we make sure to only write the term iff + // there is at least one document. + postings_serializer.new_term(&term)?; + term_written = true; + } + let delta_positions: &[u32] = + delta_position_computer.compute_delta_positions(segment_postings.positions()); + try!(postings_serializer.write_doc( + remapped_doc_id, + segment_postings.term_freq(), + delta_positions)); + } + } + } + + if term_written { + try!(postings_serializer.close_term()); + } } - try!(postings_serializer.close_term()); + } - Ok(()) } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 478e851da..b2a71fa02 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -14,7 +14,10 @@ mod segment_entry; mod doc_opstamp_mapping; pub mod operation; -pub use self::segment_entry::SegmentEntry; + +// TODO avoid exposing SegmentState / SegmentEntry if it does not have to be public API + +pub use self::segment_entry::{SegmentEntry, SegmentState}; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; pub use self::index_writer::IndexWriter; diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 87b93be4c..5f2216342 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -120,6 +120,7 @@ impl Default for SegmentRegister { #[cfg(test)] mod tests { + use indexer::SegmentState; use core::SegmentId; use core::SegmentMeta; use super::*; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index d72efd313..b9c4e47d2 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,44 +1,42 @@ #![allow(for_kv_map)] use core::Index; -use Error; -use core::Segment; -use indexer::{MergePolicy, DefaultMergePolicy}; -use core::SegmentId; -use core::SegmentMeta; -use std::mem; -use std::sync::atomic::Ordering; -use std::ops::DerefMut; -use futures::{Future, future}; -use futures::oneshot; -use futures::Canceled; -use std::thread; -use std::sync::atomic::AtomicUsize; -use std::sync::RwLock; -use core::SerializableSegment; -use indexer::MergeCandidate; -use indexer::merger::IndexMerger; -use std::borrow::BorrowMut; -use indexer::SegmentSerializer; -use indexer::SegmentEntry; -use schema::Schema; -use indexer::index_writer::advance_deletes; -use directory::Directory; -use std::thread::JoinHandle; -use std::sync::Arc; -use std::collections::HashMap; -use rustc_serialize::json; -use indexer::delete_queue::DeleteQueue; -use Result; -use futures_cpupool::CpuPool; use core::IndexMeta; use core::META_FILEPATH; +use core::Segment; +use core::SegmentId; +use core::SegmentMeta; +use core::SerializableSegment; +use directory::Directory; +use Error; +use futures_cpupool::CpuPool; +use futures::{Future, future}; +use futures::Canceled; +use futures::oneshot; +use indexer::{MergePolicy, DefaultMergePolicy}; +use indexer::delete_queue::DeleteQueue; +use indexer::index_writer::advance_deletes; +use indexer::MergeCandidate; +use indexer::merger::IndexMerger; +use indexer::SegmentEntry; +use indexer::SegmentSerializer; +use Result; +use rustc_serialize::json; +use schema::Schema; +use std::borrow::BorrowMut; +use std::collections::HashMap; use std::io::Write; +use std::mem; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::RwLock; +use std::thread; +use std::thread::JoinHandle; use super::segment_manager::{SegmentManager, get_segments}; - - /// Save the index meta file. /// This operation is atomic : /// Either @@ -171,8 +169,7 @@ impl SegmentUpdater { .into_iter() .map(|segment_entry| { let mut segment = self.0.index.segment(segment_entry.meta().clone()); - advance_deletes(&mut segment, &self.0.delete_queue, segment_entry.doc_to_opstamp()) - .map(|entry| entry.meta().clone()) + advance_deletes(&mut segment, &self.0.delete_queue.snapshot(), segment_entry.doc_to_opstamp()) }) .collect() } @@ -206,27 +203,37 @@ impl SegmentUpdater { let merging_thread_id = self.get_merging_thread_id(); let (merging_future_send, merging_future_recv) = oneshot(); + let delete_operations = self.0.delete_queue.snapshot(); + if segment_ids.is_empty() { return merging_future_recv; } let merging_join_handle = thread::spawn(move || { - // first we need to apply deletes to our segment. info!("Start merge: {:?}", segment_ids_vec); let ref index = segment_updater_clone.0.index; let schema = index.schema(); - let segment_metas: Vec = segment_ids_vec - .iter() - .map(|segment_id| - segment_updater_clone.0.segment_manager - .segment_entry(segment_id) - .map(|segment_entry| segment_entry.meta().clone()) - .ok_or(Error::InvalidArgument(format!("Segment({:?}) does not exist anymore", segment_id))) - ) - .collect::>()?; + + let mut segment_metas = vec!(); + for segment_id in &segment_ids_vec { + if let Some(segment_entry) = segment_updater_clone.0 + .segment_manager + .segment_entry(segment_id) { + let mut segment = index.segment(segment_entry.meta().clone()); + let segment_meta = advance_deletes( + &mut segment, + &delete_operations, + segment_entry.doc_to_opstamp())?; + segment_metas.push(segment_meta); + } + else { + error!("Error, had to abort merge as some of the segment is not managed anymore.a"); + return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id))); + } + } let segments: Vec = segment_metas .iter() @@ -251,6 +258,7 @@ impl SegmentUpdater { .end_merge(segment_metas.clone(), segment_entry.clone()) .wait() .unwrap(); + merging_future_send.complete(segment_entry.clone()); segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); Ok(segment_entry) diff --git a/src/postings/mod.rs b/src/postings/mod.rs index f9898b9fc..5e9b28414 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -64,7 +64,7 @@ mod tests { let mut segment = index.new_segment(); let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap(); let term = Term::from_field_text(text_field, "abc"); - posting_serializer.new_term(&term, 3).unwrap(); + posting_serializer.new_term(&term).unwrap(); for doc_id in 0u32..3u32 { let positions = vec!(1,2,3,2); posting_serializer.write_doc(doc_id, 2, &positions).unwrap(); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index a3c0194f1..667e67458 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -122,7 +122,7 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' for (term_bytes, (addr, recorder)) in term_offsets { // TODO remove copy term.set_content(term_bytes); - try!(serializer.new_term(&term, recorder.doc_freq())); + try!(serializer.new_term(&term)); try!(recorder.serialize(addr, serializer, heap)); try!(serializer.close_term()); } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index afbfbe948..508df95b6 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -63,6 +63,7 @@ pub struct PostingsSerializer { schema: Schema, text_indexing_options: TextIndexingOptions, term_open: bool, + current_term_info: TermInfo, } impl PostingsSerializer { @@ -88,6 +89,7 @@ impl PostingsSerializer { schema: schema, text_indexing_options: TextIndexingOptions::Unindexed, term_open: false, + current_term_info: TermInfo::default(), }) } @@ -121,7 +123,7 @@ impl PostingsSerializer { /// * term - the term. It needs to come after the previous term according /// to the lexicographical order. /// * doc_freq - return the number of document containing the term. - pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { + pub fn new_term(&mut self, term: &Term) -> io::Result<()> { if self.term_open { panic!("Called new_term, while the previous term was not closed."); } @@ -131,13 +133,12 @@ impl PostingsSerializer { self.last_doc_id_encoded = 0; self.term_freqs.clear(); self.position_deltas.clear(); - let term_info = TermInfo { - doc_freq: doc_freq, + self.current_term_info = TermInfo { + doc_freq: 0, postings_offset: self.written_bytes_postings as u32, positions_offset: self.written_bytes_positions as u32, }; - self.terms_fst_builder - .insert(term.as_slice(), &term_info) + self.terms_fst_builder.insert_key(term.as_slice()) } /// Finish the serialization for this term postings. @@ -146,6 +147,9 @@ impl PostingsSerializer { /// using `VInt` encoding. pub fn close_term(&mut self) -> io::Result<()> { if self.term_open { + + self.terms_fst_builder.insert_value(&self.current_term_info)?; + if !self.doc_ids.is_empty() { // we have doc ids waiting to be written // this happens when the number of doc ids is @@ -202,6 +206,7 @@ impl PostingsSerializer { term_freq: u32, position_deltas: &[u32]) -> io::Result<()> { + self.current_term_info.inc_doc_freq(); self.doc_ids.push(doc_id); if self.text_indexing_options.is_termfreq_enabled() { self.term_freqs.push(term_freq as u32); diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index ac7edf591..20bc0b1b8 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -12,7 +12,7 @@ use std::io; /// * `postings_offset` : an offset in the `.idx` file /// addressing the start of the posting list associated /// to this term. -#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)] +#[derive(Debug,Default,Ord,PartialOrd,Eq,PartialEq,Clone)] pub struct TermInfo { /// Number of documents in the segment containing the term pub doc_freq: u32, @@ -22,6 +22,11 @@ pub struct TermInfo { pub positions_offset: u32, } +impl TermInfo { + pub fn inc_doc_freq(&mut self) { + self.doc_freq += 1; + } +} impl BinarySerializable for TermInfo { fn serialize(&self, writer: &mut io::Write) -> io::Result {