diff --git a/src/core/fastfield.rs b/src/core/fastfield.rs index 735008027..eb3281ecb 100644 --- a/src/core/fastfield.rs +++ b/src/core/fastfield.rs @@ -177,6 +177,7 @@ pub struct U32FastFieldReader { _data: ReadOnlySource, data_ptr: *const u64, min_val: u32, + max_val: u32, num_bits: u8, mask: u32, num_in_pack: u32, @@ -184,6 +185,15 @@ pub struct U32FastFieldReader { } impl U32FastFieldReader { + + pub fn min_val(&self,) -> u32 { + self.min_val + } + + pub fn max_val(&self,) -> u32 { + self.max_val + } + pub fn open(data: ReadOnlySource) -> io::Result { let min_val; let amplitude; @@ -200,6 +210,7 @@ impl U32FastFieldReader { _data: data, data_ptr: ptr as *const u64, min_val: min_val, + max_val: min_val + amplitude, num_bits: num_bits, mask: mask, num_in_pack: num_in_pack, diff --git a/src/core/merger.rs b/src/core/merger.rs index ac208cd64..bdf5147a0 100644 --- a/src/core/merger.rs +++ b/src/core/merger.rs @@ -5,102 +5,148 @@ use core::schema::DocId; use core::index::SerializableSegment; use core::codec::SegmentSerializer; use core::postings::PostingsSerializer; +use core::postings::TermInfo; use std::collections::BinaryHeap; -use core::serialize::BinarySerializable; use core::fstmap::FstMapIter; -use std::cmp::Ordering; -use std::cmp::Ord; +use core::schema::Term; +use core::schema::Schema; +use core::fastfield::FastFieldSerializer; +use core::schema::U32Field; +use std::cmp::min; +use std::cmp::max; -pub struct StreamUnion<'a, V: 'static + BinarySerializable + Ord + Clone> { - streams: Vec>, - heap: BinaryHeap<(&'a [u8], usize, V)>, - //heap: BinaryHeap<(&'a [u8], usize)>, - // heap: BinaryHeap, +struct PostingsMerger<'a> { + doc_ids: Vec, + doc_offsets: Vec, + heap: BinaryHeap<(Vec, usize, TermInfo)>, + term_streams: Vec>, + readers: &'a Vec, } -impl<'a, V: 'static + Ord + BinarySerializable + Clone> StreamUnion<'a, V> { +impl<'a> PostingsMerger<'a> { + fn new(readers: &'a Vec) -> PostingsMerger<'a> { + let doc_offsets: Vec = readers + .iter() + .map(|reader| reader.max_doc()) + .collect(); + let term_streams = readers + .iter() + .map(|reader| reader.term_infos().stream()) + .collect(); + let mut postings_merger = PostingsMerger { + heap: BinaryHeap::new(), + term_streams: term_streams, + doc_ids: Vec::new(), + doc_offsets: doc_offsets, + readers: readers, + }; + for segment_ord in 0..readers.len() { + postings_merger.push_next_segment_el(segment_ord); + } + postings_merger + } - pub fn open(mut streams: Vec>) -> StreamUnion<'a, V> { - let mut heap = BinaryHeap::new(); + fn push_next_segment_el(&mut self, segment_ord: usize) { + match self.term_streams[segment_ord].next() { + Some((term, val)) => { + let it = (Vec::from(term), segment_ord, val.clone()); + self.heap.push(it); + } + None => {} + } + } + + fn append_segment(&mut self, segment_ord: usize, term_info: TermInfo) { { - let streams_it = streams.iter_mut(); - loop { - match streams_it { - - Some(fst_map_it) => { - } - None => { - break; - } - } + let offset = self.doc_offsets[segment_ord]; + let reader = &self.readers[segment_ord]; + for doc_id in reader.read_postings(term_info.postings_offset) { + self.doc_ids.push(offset + doc_id); } } - let (k, v) = streams.iter_mut().next().unwrap().next().unwrap(); - // for (i, stream) in streams.iter_mut().enumerate() { - // match stream.next() { - // Some(kv) => { - // let (key, val): (&'a [u8], V) = kv; - // //heap.push((key.clone(), i.clone(), val.clone())); - // // let c: &'a [u8] = key; - // // heap.push((key.clone(), i.clone())); - // //heap.push(i.clone()); - // // heap.push(i.clone()); - // }, - // None => {}, - // } - // } - StreamUnion { - streams: streams, - heap: heap, + self.push_next_segment_el(segment_ord); + } + + fn next(&mut self,) -> Option<(Vec, &Vec)> { + // TODO remove the Vec allocations + match self.heap.pop() { + Some((term, segment_ord, term_info)) => { + self.doc_ids.clear(); + self.append_segment(segment_ord, term_info); + loop { + match self.heap.peek() { + Some(&(ref next_term, _, _)) if next_term == &term => {}, + _ => { break; } + } + let (_, segment_ord, next_term_info) = self.heap.pop().unwrap(); + self.append_segment(segment_ord, next_term_info); + } + Some((term, &self.doc_ids)) + }, + None => None } } - - pub fn next(&mut self) -> Option<(&[u8], V)> { - // match self.heap.pop() { - // Some((k, i, v)) => { - // let mut vals = Vec::new(); - // match self.streams[i].next() { - // Some((k_next,v_next)) => { - // self.heap.push((k_next, i, v_next)); - // }, - // None => {} - // } - // - // }, - // None => None, - // } - None - } } - - struct IndexMerger { + schema: Schema, readers: Vec, - offsets: Vec, } + impl IndexMerger { - pub fn open(segments: &Vec) -> io::Result { + pub fn open(schema: Schema, segments: &Vec) -> io::Result { let mut readers = Vec::new(); - let mut offsets = Vec::new(); for segment in segments.iter() { let reader = try!(SegmentReader::open(segment.clone())); - offsets.push(reader.max_doc()); readers.push(reader); } Ok(IndexMerger { + schema: schema, readers: readers, - offsets: offsets, }) } - fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> io::Result<()> { - for reader in self.readers.iter() { - let term_infos = reader.term_infos(); - let term_stream = term_infos.stream(); + fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> io::Result<()> { + for field in self.schema + .get_u32_fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.option.is_fast()) + .map(|(field_id, _)| U32Field(field_id as u8)) { + let mut u32_readers = Vec::new(); + let mut min_val = u32::min_value(); + let mut max_val = 0; + for reader in self.readers.iter() { + let u32_reader = try!(reader.get_fast_field_reader(&field)); + min_val = min(min_val, u32_reader.min_val()); + max_val = max(max_val, u32_reader.max_val()); + u32_readers.push((reader.max_doc(), u32_reader)); + } + fast_field_serializer.new_u32_fast_field(field, min_val, max_val); + for (max_doc, u32_reader) in u32_readers { + for doc_id in 0..max_doc { + let val = u32_reader.get(doc_id); + try!(fast_field_serializer.add_val(val)); + } + } + try!(fast_field_serializer.close_field()); + } + Ok(()) + } + + fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> io::Result<()> { + let mut postings_merger = PostingsMerger::new(&self.readers); + loop { + match postings_merger.next() { + Some((term, doc_ids)) => { + try!(postings_serializer.new_term(&Term::from(&term), doc_ids.len() as DocId)); + try!(postings_serializer.write_docs(doc_ids)); + } + None => { break; } + } } Ok(()) } @@ -109,6 +155,7 @@ impl IndexMerger { impl SerializableSegment for IndexMerger { fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> { try!(self.write_postings(serializer.get_postings_serializer())); + try!(self.write_fast_fields(serializer.get_fast_field_serializer())); Ok(()) } } diff --git a/src/core/postings.rs b/src/core/postings.rs index 7a46a4bbb..7401d5076 100644 --- a/src/core/postings.rs +++ b/src/core/postings.rs @@ -11,7 +11,7 @@ use core::serialize::BinarySerializable; use std::io::{Read, Write}; use std::io; -#[derive(Debug)] +#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)] pub struct TermInfo { pub doc_freq: u32, pub postings_offset: u32, diff --git a/src/core/reader.rs b/src/core/reader.rs index d60b0e4e7..2fd7640d5 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -30,7 +30,7 @@ impl fmt::Debug for SegmentReader { pub struct SegmentPostings { doc_id: usize, - doc_ids: Vec, + doc_ids: Vec, } impl SegmentPostings { @@ -109,6 +109,7 @@ impl SegmentReader { self.segment_info.max_doc } + /// Open a new segment for reading. pub fn open(segment: Segment) -> io::Result { let segment_info_reader = try!(segment.open_read(SegmentComponent::INFO)); @@ -148,7 +149,7 @@ impl SegmentReader { self.fast_fields_reader.get_field(u32_field) } - fn read_postings(&self, offset: u32) -> SegmentPostings { + pub fn read_postings(&self, offset: u32) -> SegmentPostings { let postings_data = &self.postings_data.as_slice()[(offset as usize)..]; SegmentPostings::from_data(&postings_data) } diff --git a/src/core/writer.rs b/src/core/writer.rs index e4b0cda45..8a0ea341c 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -11,14 +11,12 @@ use core::postings::PostingsWriter; use core::fastfield::U32FastFieldsWriter; use std::clone::Clone; use std::sync::mpsc; -use std::sync::mpsc::channel; use std::thread; use std::sync::Mutex; use std::sync::mpsc::SyncSender; use std::sync::mpsc::Receiver; use std::thread::JoinHandle; use std::sync::Arc; -use std::rc::Rc; pub struct IndexWriter { // segment_writers: Vec, @@ -36,8 +34,7 @@ impl IndexWriter { let schema = index.schema(); let (queue_input, queue_output): (SyncSender, Receiver) = mpsc::sync_channel(10_000); let queue_output_sendable = Arc::new(Mutex::new(queue_output)); - let threads = (0..num_threads).map(|thread_id| { - + let threads = (0..num_threads).map(|_| { let queue_output_clone = queue_output_sendable.clone(); let mut index_clone = index.clone(); let schema_clone = schema.clone(); @@ -49,8 +46,6 @@ impl IndexWriter { let mut docs_remaining = true; while docs_remaining { let segment = index_clone.new_segment(); - let segment_clone = segment.clone(); - let mut doc; { match queue_output_clone.lock().unwrap().recv() { @@ -61,8 +56,7 @@ impl IndexWriter { let mut segment_writer = SegmentWriter::for_segment(segment.clone(), &schema_clone).unwrap(); segment_writer.add_document(&*doc, &schema_clone).unwrap(); - - for i in 0..(225_000 - 1) { + for _ in 0..(225_000 - 1) { { let queue = queue_output_clone.lock().unwrap(); match queue.recv() { @@ -96,11 +90,12 @@ impl IndexWriter { }) } - pub fn wait(self,) { + pub fn wait(self,) -> thread::Result<()> { drop(self.queue_input); for thread in self.threads { - thread.join(); + try!(thread.join()); } + Ok(()) } pub fn add_document(&mut self, doc: Document) -> io::Result<()> {