diff --git a/Cargo.toml b/Cargo.toml index a68030cc4..837e63965 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ tempdir = "0.3.4" bincode = "0.4.0" serde = "0.6.11" libc = "0.2.6" +argparse = "*" num_cpus = "0.2" lz4 = "1.13.131" time = "0.1.34" @@ -30,3 +31,7 @@ rand = "0.3.13" [build-dependencies] gcc = "0.3.24" + +[[bin]] +name = "tantivy-merge" +path = "src/cli/merge.rs" diff --git a/src/core/mod.rs b/src/core/mod.rs index a98de2c85..cc1be2737 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -15,6 +15,7 @@ pub mod index; pub mod fastfield; pub mod fastdivide; pub mod merger; +pub mod timer; use std::error; use std::io; diff --git a/src/core/reader.rs b/src/core/reader.rs index e514fe8c7..a9d7a15a5 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -7,20 +7,23 @@ use core::directory::ReadOnlySource; use std::io::Cursor; use core::schema::DocId; use core::index::SegmentComponent; -use core::postings::Postings; use core::simdcompression::Decoder; use std::io; +use std::iter; use std::str; use core::postings::TermInfo; use core::fstmap::FstMap; use std::fmt; use rustc_serialize::json; use core::index::SegmentInfo; +use core::timer::OpenTimer; use core::schema::U32Field; use core::convert_to_ioerror; use core::serialize::BinarySerializable; use core::fastfield::U32FastFieldsReader; use core::fastfield::U32FastFieldReader; +use core::simdcompression; +use std::mem; impl fmt::Debug for SegmentReader { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -28,68 +31,95 @@ impl fmt::Debug for SegmentReader { } } -pub struct SegmentPostings { - doc_id: usize, - doc_ids: Vec, +#[inline(never)] +pub fn intersection(mut postings: Vec) -> SegmentPostings { + let min_len = postings + .iter() + .map(|v| v.len()) + .min() + .unwrap(); + let mut buffer: Vec = postings.pop().unwrap().0; + let mut output: Vec = Vec::with_capacity(min_len); + unsafe { output.set_len(min_len); } + let mut pair = (output, buffer); + for posting in postings.iter() { + pair = (pair.1, pair.0); + let output_len = simdcompression::intersection(posting.0.as_slice(), pair.0.as_slice(), pair.1.as_mut_slice()); + unsafe { pair.1.set_len(output_len); } + } + SegmentPostings(pair.1) +} + +pub struct SegmentPostings(Vec); + +impl IntoIterator for SegmentPostings { + type Item = DocId; + type IntoIter = ::std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } } impl SegmentPostings { pub fn empty()-> SegmentPostings { - SegmentPostings { - doc_id: 0, - doc_ids: Vec::new(), - } + SegmentPostings(Vec::new()) + } + + pub fn len(&self,) -> usize { + self.0.len() } pub fn from_data(doc_freq: DocId, data: &[u8]) -> SegmentPostings { let mut cursor = Cursor::new(data); - let data: Vec = Vec::deserialize(&mut cursor).unwrap(); - let mut doc_ids: Vec = (0u32..doc_freq).collect(); - let decoder = Decoder::new(); - let num_doc_ids = decoder.decode_sorted(&data, &mut doc_ids); - doc_ids.truncate(num_doc_ids); - SegmentPostings { - doc_ids: doc_ids, - doc_id: 0, + let num_u32s = u32::deserialize(&mut cursor).unwrap(); + let data_u32: &[u32] = unsafe { mem::transmute(data) }; + let mut doc_ids: Vec = Vec::with_capacity(doc_freq as usize); + unsafe { doc_ids.set_len(doc_freq as usize); } + { + let decoder = Decoder::new(); + let num_doc_ids = decoder.decode_sorted(&data_u32[1..(num_u32s+1) as usize], &mut doc_ids); + SegmentPostings(doc_ids) } } } +// +// impl Postings for SegmentPostings { +// fn skip_next(&mut self, target: DocId) -> Option { +// loop { +// match Iterator::next(self) { +// Some(val) if val >= target => { +// return Some(val); +// }, +// None => { +// return None; +// }, +// _ => {} +// } +// } +// } +// } -impl Postings for SegmentPostings { - fn skip_next(&mut self, target: DocId) -> Option { - loop { - match Iterator::next(self) { - Some(val) if val >= target => { - return Some(val); - }, - None => { - return None; - }, - _ => {} - } - } - } -} +// +// impl Iterator for SegmentPostings { +// +// type Item = DocId; +// +// fn next(&mut self,) -> Option { +// if self.doc_id < self.doc_ids.len() { +// let res = Some(self.doc_ids[self.doc_id]); +// self.doc_id += 1; +// return res; +// } +// else { +// None +// } +// } +// } -impl Iterator for SegmentPostings { - - type Item = DocId; - - fn next(&mut self,) -> Option { - if self.doc_id < self.doc_ids.len() { - let res = Some(self.doc_ids[self.doc_id]); - self.doc_id += 1; - return res; - } - else { - None - } - } -} - pub struct SegmentReader { segment_info: SegmentInfo, segment_id: SegmentId, @@ -165,22 +195,40 @@ impl SegmentReader { /// Returns the list of doc ids containing all of the /// given terms. - pub fn search(&self, terms: &Vec) -> IntersectionPostings { - let mut segment_postings: Vec = Vec::new(); - for term in terms.iter() { - match self.get_term(term) { + pub fn search<'a>(&self, terms: &Vec, mut timer: OpenTimer<'a>) -> SegmentPostings { + if terms.len() == 1 { + match self.get_term(&terms[0]) { Some(term_info) => { - let segment_posting = self.read_postings(&term_info); - segment_postings.push(segment_posting); + self.read_postings(&term_info) } None => { - segment_postings.clear(); - segment_postings.push(SegmentPostings::empty()); - break; + SegmentPostings::empty() } } } - IntersectionPostings::from_postings(segment_postings) + else { + let mut segment_postings: Vec = Vec::new(); + { + let mut decode_timer = timer.open("decode_all"); + for term in terms.iter() { + match self.get_term(term) { + Some(term_info) => { + let decode_one_timer = decode_timer.open("decode_one"); + let segment_posting = self.read_postings(&term_info); + segment_postings.push(segment_posting); + } + None => { + // currently this is a strict intersection. + return SegmentPostings::empty(); + } + } + } + } + { + let mut intersection_time = timer.open("intersection"); + intersection(segment_postings) + } + } } } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index a07b81ad1..9be6e10f8 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -5,6 +5,7 @@ use core::schema::DocId; use core::schema::Document; use core::collector::Collector; use std::io; +use core::timer::TimerTree; use core::schema::Term; #[derive(Debug)] @@ -48,15 +49,26 @@ impl Searcher { Ok(searcher) } - pub fn search(&self, terms: &Vec, collector: &mut Collector) -> io::Result<()> { - for (segment_ord, segment) in self.segments.iter().enumerate() { - try!(collector.set_segment(segment_ord as SegmentLocalId, &segment)); - let postings = segment.search(terms); - for doc_id in postings { - collector.collect(doc_id); + pub fn search(&self, terms: &Vec, collector: &mut C) -> io::Result { + let mut timer_tree = TimerTree::new(); + { + let mut search_timer = timer_tree.open("search"); + for (segment_ord, segment) in self.segments.iter().enumerate() { + let mut segment_search_timer = search_timer.open("segment_search"); + { + let set_segment_timer = segment_search_timer.open("set_segment"); + try!(collector.set_segment(segment_ord as SegmentLocalId, &segment)); + } + let postings = segment.search(terms, segment_search_timer.open("get_postings")); + { + let collection_timer = segment_search_timer.open("collection"); + for doc_id in postings { + collector.collect(doc_id); + } + } } } - Ok(()) + Ok(timer_tree) } } diff --git a/src/core/simdcompression.rs b/src/core/simdcompression.rs index 4606c1eba..4434c071c 100644 --- a/src/core/simdcompression.rs +++ b/src/core/simdcompression.rs @@ -1,15 +1,21 @@ use libc::size_t; use std::ptr; -// use std::cmp::min; -// use std::iter; extern { // fn encode_unsorted_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t; // fn decode_unsorted_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t; - // fn intersection_native(left_data: *const u32, left_size: size_t, right_data: *const u32, right_size: size_t, output: *mut u32) -> size_t; + fn intersection_native(left_data: *const u32, left_size: size_t, right_data: *const u32, right_size: size_t, output: *mut u32) -> size_t; fn encode_sorted_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t; fn decode_sorted_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t; +} +pub fn intersection(left: &[u32], right: &[u32], output: &mut [u32]) -> usize { + unsafe { + intersection_native( + left.as_ptr(), left.len(), + right.as_ptr(), right.len(), + output.as_mut_ptr()) + } } pub struct Encoder {