issue/43 Merging ok for postings / fastfields.

This commit is contained in:
Paul Masurel
2017-02-19 17:43:05 +09:00
parent e3d2fca844
commit 7315000fd4
13 changed files with 262 additions and 136 deletions

View File

@@ -266,6 +266,10 @@ impl SegmentReader {
self.segment_id
}
pub fn delete_bitset(&self) -> &DeleteBitSet {
&self.delete_bitset
}
pub fn is_deleted(&self, doc: DocId) -> bool {
self.delete_bitset.is_deleted(doc)
}

View File

@@ -30,7 +30,21 @@ impl<W: Write, V: BinarySerializable> FstMapBuilder<W, V> {
})
}
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
/// Horribly unsafe, nobody should ever do that... except me :)
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
try!(self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error));
Ok(())
}
/// Horribly unsafe, nobody should ever do that... except me :)
pub fn insert_value(&mut self, value: &V) -> io::Result<()> {
try!(value.serialize(&mut self.data));
Ok(())
}
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
try!(self.fst_builder
.insert(key, self.data.len() as u64)
.map_err(convert_fst_error));

View File

@@ -56,6 +56,10 @@ impl DeleteBitSet {
}
}
pub fn has_deletes(&self) -> bool {
self.len() > 0
}
pub fn is_deleted(&self, doc: DocId) -> bool {
if self.len == 0 {
false
@@ -72,7 +76,6 @@ impl DeleteBitSet {
impl HasLen for DeleteBitSet {
fn len(&self) -> usize {
self.len
}

View File

@@ -32,6 +32,8 @@ impl InnerDeleteQueue {
}
// TODO Rename to DeleteQueueSnapshot
#[derive(Default, Clone)]
pub struct ReadOnlyDeletes(Vec<Arc<Vec<DeleteOperation>>>);

View File

@@ -9,6 +9,7 @@ use core::SegmentReader;
use datastruct::stacker::Heap;
use Error;
use fastfield::delete::write_delete_bitset;
use indexer::delete_queue::ReadOnlyDeletes;
use futures::Canceled;
use futures::Future;
use indexer::delete_queue::DeleteQueue;
@@ -84,18 +85,22 @@ pub struct IndexWriter {
impl !Send for IndexWriter {}
impl !Sync for IndexWriter {}
/// TODO
/// work on SegmentMeta
// TODO put delete bitset in segment entry
// rather than DocToOpstamp.
// TODO skip delete operation before teh
// last delete opstamp
pub fn advance_deletes(
segment: &mut Segment,
delete_queue: &DeleteQueue,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentEntry> {
delete_operations: &ReadOnlyDeletes,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentMeta> {
let segment_reader = SegmentReader::open(segment.clone())?;
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let mut last_opstamp_opt: Option<u64> = None;
let delete_operations = delete_queue.snapshot();
for delete_op in delete_operations.iter() {
// A delete operation should only affect
// document that were inserted after it.
@@ -125,7 +130,7 @@ pub fn advance_deletes(
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
Ok(SegmentEntry::new(segment.meta().clone()))
Ok(segment.meta().clone())
}
fn index_documents(heap: &mut Heap,

View File

@@ -5,18 +5,19 @@ use DocId;
use core::SerializableSegment;
use indexer::SegmentSerializer;
use postings::PostingsSerializer;
use fastfield::U32FastFieldReader;
use itertools::Itertools;
use postings::Postings;
use postings::DocSet;
use core::TermIterator;
use fastfield::delete::DeleteBitSet;
use schema::{Schema, Field};
use fastfield::FastFieldSerializer;
use store::StoreWriter;
use postings::ChainedPostings;
use postings::HasLen;
use postings::OffsetPostings;
use core::SegmentInfo;
use std::cmp::{min, max};
use std::iter;
use std::io;
pub struct IndexMerger {
schema: Schema,
@@ -47,12 +48,40 @@ impl DeltaPositionComputer {
}
}
fn compute_min_max_val(u32_reader: &U32FastFieldReader, max_doc: DocId, delete_bitset: &DeleteBitSet) -> Option<(u32, u32)> {
if max_doc == 0 {
None
}
else if !delete_bitset.has_deletes() {
// no deleted documents,
// we can use the previous min_val, max_val.
Some((u32_reader.min_val(), u32_reader.max_val()))
}
else {
// some deleted documents,
// we need to recompute the max / min
(0..max_doc)
.filter(|doc_id| !delete_bitset.is_deleted(*doc_id))
.minmax()
.into_option()
}
}
fn extract_fieldnorm_reader(segment_reader: &SegmentReader, field: Field) -> io::Result<U32FastFieldReader> {
segment_reader.get_fieldnorms_reader(field)
}
fn extract_fast_field_reader(segment_reader: &SegmentReader, field: Field) -> io::Result<U32FastFieldReader> {
segment_reader.get_fast_field_reader(field)
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> Result<IndexMerger> {
let mut readers = Vec::new();
let mut readers = vec!();
let mut max_doc = 0;
for segment in segments {
let reader = try!(SegmentReader::open(segment.clone()));
let reader = SegmentReader::open(segment.clone())?;
max_doc += reader.num_docs();
readers.push(reader);
}
@@ -63,74 +92,104 @@ impl IndexMerger {
})
}
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
// TODO make sure that works even if the field is never here.
for field in self.schema
fn write_fieldnorms(&self,
fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
let fieldnorm_fastfields: Vec<Field> = self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_indexed())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
for reader in &self.readers {
let u32_reader = try!(reader.get_fieldnorms_reader(field));
min_val = min(min_val, u32_reader.min_val());
max_val = max(max_val, u32_reader.max_val());
u32_readers.push((reader.max_doc(), u32_reader));
}
try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val));
for (max_doc, u32_reader) in u32_readers {
for doc_id in 0..max_doc {
let val = u32_reader.get(doc_id);
try!(fast_field_serializer.add_val(val));
}
}
try!(fast_field_serializer.close_field());
}
Ok(())
.map(|(field_id, _)| Field(field_id as u8))
.collect();
self.generic_write_fast_field(fieldnorm_fastfields, &extract_fieldnorm_reader, fast_field_serializer)
}
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
for field in self.schema
let fast_fields: Vec<Field> = self.schema
.fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.is_u32_fast())
.map(|(field_id, _)| Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
.map(|(field_id, _)| Field(field_id as u8))
.collect();
self.generic_write_fast_field(fast_fields, &extract_fast_field_reader, fast_field_serializer)
}
// used both to merge field norms and regular u32 fast fields.
fn generic_write_fast_field(&self,
fields: Vec<Field>,
field_reader_extractor: &Fn(&SegmentReader, Field) -> io::Result<U32FastFieldReader>,
fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
for field in fields {
let mut u32_readers = vec!();
let mut min_val = u32::max_value();
let mut max_val = u32::min_value();
for reader in &self.readers {
let u32_reader = try!(reader.get_fast_field_reader(field));
min_val = min(min_val, u32_reader.min_val());
max_val = max(max_val, u32_reader.max_val());
u32_readers.push((reader.max_doc(), u32_reader));
}
try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val));
for (max_doc, u32_reader) in u32_readers {
for doc_id in 0..max_doc {
let val = u32_reader.get(doc_id);
try!(fast_field_serializer.add_val(val));
let u32_reader = field_reader_extractor(reader, field)?;
if let Some((seg_min_val, seg_max_val)) = compute_min_max_val(&u32_reader, reader.max_doc(), reader.delete_bitset()) {
// the segment has some non-deleted documents
min_val = min(min_val, seg_min_val);
max_val = max(max_val, seg_max_val);
u32_readers.push((reader.max_doc(), u32_reader, reader.delete_bitset()));
}
}
if u32_readers.is_empty() {
// we have actually zero documents.
min_val = 0;
max_val = 0;
}
assert!(min_val <= max_val);
// TODO test deleting all documents off the index.
try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val));
for (max_doc, u32_reader, delete_bitset) in u32_readers {
for doc_id in 0..max_doc {
if !delete_bitset.is_deleted(doc_id) {
let val = u32_reader.get(doc_id);
try!(fast_field_serializer.add_val(val));
}
}
}
try!(fast_field_serializer.close_field());
}
Ok(())
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> Result<()> {
fn write_postings(&self,
postings_serializer: &mut PostingsSerializer) -> Result<()> {
let mut merged_terms = TermIterator::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
let mut offsets: Vec<DocId> = Vec::new();
let mut max_doc = 0;
for reader in &self.readers {
offsets.push(max_doc);
max_doc += reader.max_doc();
}
// map from segment doc ids to the resulting merged segment doc id.
let mut merged_doc_id_map: Vec<Vec<Option<DocId>>> = Vec::with_capacity(self.readers.len());
for reader in &self.readers {
let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize);
for doc_id in 0..reader.max_doc() {
if reader.is_deleted(doc_id) {
segment_local_map.push(None);
}
else {
segment_local_map.push(Some(max_doc));
max_doc += 1u32;
}
}
merged_doc_id_map.push(segment_local_map);
}
while merged_terms.advance() {
// Create the total list of doc ids
// by stacking the doc ids from the different segment.
@@ -142,34 +201,51 @@ impl IndexMerger {
// - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, seg0.max_doc + seg1.max_doc + seg2.max_doc]
// ...
let term = merged_terms.term();
let mut merged_postings =
ChainedPostings::from(
merged_terms
.segment_ords()
.iter()
.cloned()
.flat_map(|segment_ord| {
let offset = offsets[segment_ord];
self.readers[segment_ord]
.read_postings_all_info(&term)
.map(|segment_postings| OffsetPostings::new(segment_postings, offset))
})
.collect::<Vec<_>>()
);
let mut term_written = false;
let segment_postings = merged_terms
.segment_ords()
.iter()
.cloned()
.flat_map(|segment_ord| {
self.readers[segment_ord]
.read_postings_all_info(&term)
.map(|segment_postings| (segment_ord, segment_postings))
})
.collect::<Vec<_>>();
// We can now serialize this postings, by pushing each document to the
// postings serializer.
try!(postings_serializer.new_term(&term, merged_postings.len() as DocId));
while merged_postings.advance() {
let delta_positions: &[u32] =
delta_position_computer.compute_delta_positions(merged_postings.positions());
try!(postings_serializer.write_doc(merged_postings.doc(),
merged_postings.term_freq(),
delta_positions));
// We can remove the term if all documents which
// contained it have been deleted.
if segment_postings.len() > 0 {
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in segment_postings {
let old_to_new_doc_id = &merged_doc_id_map[segment_ord];
while segment_postings.advance() {
if let Some(remapped_doc_id) = old_to_new_doc_id[segment_postings.doc() as usize] {
if !term_written {
// we make sure to only write the term iff
// there is at least one document.
postings_serializer.new_term(&term)?;
term_written = true;
}
let delta_positions: &[u32] =
delta_position_computer.compute_delta_positions(segment_postings.positions());
try!(postings_serializer.write_doc(
remapped_doc_id,
segment_postings.term_freq(),
delta_positions));
}
}
}
if term_written {
try!(postings_serializer.close_term());
}
}
try!(postings_serializer.close_term());
}
Ok(())
}

View File

@@ -14,7 +14,10 @@ mod segment_entry;
mod doc_opstamp_mapping;
pub mod operation;
pub use self::segment_entry::SegmentEntry;
// TODO avoid exposing SegmentState / SegmentEntry if it does not have to be public API
pub use self::segment_entry::{SegmentEntry, SegmentState};
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
pub use self::index_writer::IndexWriter;

View File

@@ -120,6 +120,7 @@ impl Default for SegmentRegister {
#[cfg(test)]
mod tests {
use indexer::SegmentState;
use core::SegmentId;
use core::SegmentMeta;
use super::*;

View File

@@ -1,44 +1,42 @@
#![allow(for_kv_map)]
use core::Index;
use Error;
use core::Segment;
use indexer::{MergePolicy, DefaultMergePolicy};
use core::SegmentId;
use core::SegmentMeta;
use std::mem;
use std::sync::atomic::Ordering;
use std::ops::DerefMut;
use futures::{Future, future};
use futures::oneshot;
use futures::Canceled;
use std::thread;
use std::sync::atomic::AtomicUsize;
use std::sync::RwLock;
use core::SerializableSegment;
use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
use std::borrow::BorrowMut;
use indexer::SegmentSerializer;
use indexer::SegmentEntry;
use schema::Schema;
use indexer::index_writer::advance_deletes;
use directory::Directory;
use std::thread::JoinHandle;
use std::sync::Arc;
use std::collections::HashMap;
use rustc_serialize::json;
use indexer::delete_queue::DeleteQueue;
use Result;
use futures_cpupool::CpuPool;
use core::IndexMeta;
use core::META_FILEPATH;
use core::Segment;
use core::SegmentId;
use core::SegmentMeta;
use core::SerializableSegment;
use directory::Directory;
use Error;
use futures_cpupool::CpuPool;
use futures::{Future, future};
use futures::Canceled;
use futures::oneshot;
use indexer::{MergePolicy, DefaultMergePolicy};
use indexer::delete_queue::DeleteQueue;
use indexer::index_writer::advance_deletes;
use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use rustc_serialize::json;
use schema::Schema;
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
use super::segment_manager::{SegmentManager, get_segments};
/// Save the index meta file.
/// This operation is atomic :
/// Either
@@ -171,8 +169,7 @@ impl SegmentUpdater {
.into_iter()
.map(|segment_entry| {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(&mut segment, &self.0.delete_queue, segment_entry.doc_to_opstamp())
.map(|entry| entry.meta().clone())
advance_deletes(&mut segment, &self.0.delete_queue.snapshot(), segment_entry.doc_to_opstamp())
})
.collect()
}
@@ -206,27 +203,37 @@ impl SegmentUpdater {
let merging_thread_id = self.get_merging_thread_id();
let (merging_future_send, merging_future_recv) = oneshot();
let delete_operations = self.0.delete_queue.snapshot();
if segment_ids.is_empty() {
return merging_future_recv;
}
let merging_join_handle = thread::spawn(move || {
// first we need to apply deletes to our segment.
info!("Start merge: {:?}", segment_ids_vec);
let ref index = segment_updater_clone.0.index;
let schema = index.schema();
let segment_metas: Vec<SegmentMeta> = segment_ids_vec
.iter()
.map(|segment_id|
segment_updater_clone.0.segment_manager
.segment_entry(segment_id)
.map(|segment_entry| segment_entry.meta().clone())
.ok_or(Error::InvalidArgument(format!("Segment({:?}) does not exist anymore", segment_id)))
)
.collect::<Result<_>>()?;
let mut segment_metas = vec!();
for segment_id in &segment_ids_vec {
if let Some(segment_entry) = segment_updater_clone.0
.segment_manager
.segment_entry(segment_id) {
let mut segment = index.segment(segment_entry.meta().clone());
let segment_meta = advance_deletes(
&mut segment,
&delete_operations,
segment_entry.doc_to_opstamp())?;
segment_metas.push(segment_meta);
}
else {
error!("Error, had to abort merge as some of the segment is not managed anymore.a");
return Err(Error::InvalidArgument(format!("Segment {:?} requested for merge is not managed.", segment_id)));
}
}
let segments: Vec<Segment> = segment_metas
.iter()
@@ -251,6 +258,7 @@ impl SegmentUpdater {
.end_merge(segment_metas.clone(), segment_entry.clone())
.wait()
.unwrap();
merging_future_send.complete(segment_entry.clone());
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(segment_entry)

View File

@@ -64,7 +64,7 @@ mod tests {
let mut segment = index.new_segment();
let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap();
let term = Term::from_field_text(text_field, "abc");
posting_serializer.new_term(&term, 3).unwrap();
posting_serializer.new_term(&term).unwrap();
for doc_id in 0u32..3u32 {
let positions = vec!(1,2,3,2);
posting_serializer.write_doc(doc_id, 2, &positions).unwrap();

View File

@@ -122,7 +122,7 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'
for (term_bytes, (addr, recorder)) in term_offsets {
// TODO remove copy
term.set_content(term_bytes);
try!(serializer.new_term(&term, recorder.doc_freq()));
try!(serializer.new_term(&term));
try!(recorder.serialize(addr, serializer, heap));
try!(serializer.close_term());
}

View File

@@ -63,6 +63,7 @@ pub struct PostingsSerializer {
schema: Schema,
text_indexing_options: TextIndexingOptions,
term_open: bool,
current_term_info: TermInfo,
}
impl PostingsSerializer {
@@ -88,6 +89,7 @@ impl PostingsSerializer {
schema: schema,
text_indexing_options: TextIndexingOptions::Unindexed,
term_open: false,
current_term_info: TermInfo::default(),
})
}
@@ -121,7 +123,7 @@ impl PostingsSerializer {
/// * term - the term. It needs to come after the previous term according
/// to the lexicographical order.
/// * doc_freq - return the number of document containing the term.
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
pub fn new_term(&mut self, term: &Term) -> io::Result<()> {
if self.term_open {
panic!("Called new_term, while the previous term was not closed.");
}
@@ -131,13 +133,12 @@ impl PostingsSerializer {
self.last_doc_id_encoded = 0;
self.term_freqs.clear();
self.position_deltas.clear();
let term_info = TermInfo {
doc_freq: doc_freq,
self.current_term_info = TermInfo {
doc_freq: 0,
postings_offset: self.written_bytes_postings as u32,
positions_offset: self.written_bytes_positions as u32,
};
self.terms_fst_builder
.insert(term.as_slice(), &term_info)
self.terms_fst_builder.insert_key(term.as_slice())
}
/// Finish the serialization for this term postings.
@@ -146,6 +147,9 @@ impl PostingsSerializer {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.terms_fst_builder.insert_value(&self.current_term_info)?;
if !self.doc_ids.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
@@ -202,6 +206,7 @@ impl PostingsSerializer {
term_freq: u32,
position_deltas: &[u32])
-> io::Result<()> {
self.current_term_info.inc_doc_freq();
self.doc_ids.push(doc_id);
if self.text_indexing_options.is_termfreq_enabled() {
self.term_freqs.push(term_freq as u32);

View File

@@ -12,7 +12,7 @@ use std::io;
/// * `postings_offset` : an offset in the `.idx` file
/// addressing the start of the posting list associated
/// to this term.
#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)]
#[derive(Debug,Default,Ord,PartialOrd,Eq,PartialEq,Clone)]
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
@@ -22,6 +22,11 @@ pub struct TermInfo {
pub positions_offset: u32,
}
impl TermInfo {
pub fn inc_doc_freq(&mut self) {
self.doc_freq += 1;
}
}
impl BinarySerializable for TermInfo {
fn serialize(&self, writer: &mut io::Write) -> io::Result<usize> {