diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 409b559ef..2464af8ca 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -1,113 +1,640 @@ -use std::cmp::Eq; -use std::collections::HashMap; -use std::hash::Hash; - +use std::mem; use collector::Collector; -use fastfield::FastFieldReader; +use fastfield::FacetReader; use schema::Field; +use std::cell::UnsafeCell; +use schema::Facet; +use std::collections::BTreeMap; +use std::collections::BinaryHeap; +use termdict::TermDictionary; +use termdict::TermStreamer; +use termdict::TermStreamerBuilder; +use std::collections::BTreeSet; +use termdict::TermMerger; +use postings::SkipResult; +use std::{u64, usize}; +use schema::FACET_SEP_BYTE; use DocId; use Result; use Score; use SegmentReader; use SegmentLocalId; +use std::cmp::Ordering; -/// Facet collector for i64/u64 fast field -pub struct FacetCollector -where - T: FastFieldReader, - T::ValueType: Eq + Hash, -{ - counters: HashMap, - field: Field, - ff_reader: Option, +struct Hit<'a> { + count: u64, + facet: &'a Facet, } -impl FacetCollector -where - T: FastFieldReader, - T::ValueType: Eq + Hash, -{ - /// Creates a new facet collector for aggregating a given field. - pub fn new(field: Field) -> FacetCollector { - FacetCollector { - counters: HashMap::new(), +impl<'a> Eq for Hit<'a> {} + +impl<'a> PartialEq> for Hit<'a> { + fn eq(&self, other: &Hit) -> bool { + self.count == other.count + } +} + +impl<'a> PartialOrd> for Hit<'a> { + fn partial_cmp(&self, other: &Hit) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a> Ord for Hit<'a> { + fn cmp(&self, other: &Self) -> Ordering { + other.count.cmp(&self.count) + } +} + + +struct SegmentFacetCounter { + pub facet_reader: FacetReader, + pub facet_ords: Vec, + pub facet_counts: Vec, +} + +fn facet_depth(facet_bytes: &[u8]) -> usize { + if facet_bytes.is_empty() { + 0 + } else { + facet_bytes + .iter() + .cloned() + .filter(|b| *b == 0u8) + .count() + 1 + } +} + + + +/// Collector for faceting +/// +/// The collector collects all facets. You need to configure it +/// beforehand with the facet you want to extract. +/// +/// This is done by calling `.add_facet(...)` with the root of the +/// facet you want to extract as argument. +/// +/// Facet counts will only be computed for the facet that are direct children +/// of such a root facet. +/// +/// For instance, if your index represents books, your hierarchy of facets +/// may contain `category`, `language`. +/// +/// The category facet may include `subcategories`. For instance, a book +/// could belong to `/category/fiction/fantasy`. +/// +/// If you request the facet counts for `/category`, the result will be +/// the breakdown of counts for the direct children of `/category` +/// (e.g. `/category/fiction`, `/category/biography`, `/category/personal_development`). +/// +/// Once collection is finished, you can harvest its results in the form +/// of a `FacetCounts` object, and extract your facet counts from it. +/// +/// This implementation assumes you are working with a number of facets that +/// is much hundreds of time lower than your number of documents. +/// +/// +/// ```rust +/// #[macro_use] +/// extern crate tantivy; +/// use tantivy::schema::{Facet, SchemaBuilder, TEXT}; +/// use tantivy::{Index, Result}; +/// use tantivy::collector::FacetCollector; +/// use tantivy::query::AllQuery; +/// +/// # fn main() { example().unwrap(); } +/// fn example() -> Result<()> { +/// let mut schema_builder = SchemaBuilder::new(); +/// +/// // Facet have their own specific type. +/// // It is not a bad practise to put all of your +/// // facet information in the same field. +/// let facet = schema_builder.add_facet_field("facet"); +/// let title = schema_builder.add_text_field("title", TEXT); +/// let schema = schema_builder.build(); +/// let index = Index::create_in_ram(schema); +/// { +/// let mut index_writer = index.writer(3_000_000)?; +/// // a document can be associated to any number of facets +/// index_writer.add_document(doc!( +/// title => "The Name of the Wind", +/// facet => Facet::from("/lang/en"), +/// facet => Facet::from("/category/fiction/fantasy") +/// )); +/// index_writer.add_document(doc!( +/// title => "Dune", +/// facet => Facet::from("/lang/en"), +/// facet => Facet::from("/category/fiction/sci-fi") +/// )); +/// index_writer.add_document(doc!( +/// title => "La VĂ©nus d'Ille", +/// facet => Facet::from("/lang/fr"), +/// facet => Facet::from("/category/fiction/fantasy"), +/// facet => Facet::from("/category/fiction/horror") +/// )); +/// index_writer.add_document(doc!( +/// title => "The Diary of a Young Girl", +/// facet => Facet::from("/lang/en"), +/// facet => Facet::from("/category/biography") +/// )); +/// index_writer.commit().unwrap(); +/// } +/// +/// index.load_searchers()?; +/// let searcher = index.searcher(); +/// +/// { +/// let mut facet_collector = FacetCollector::for_field(facet); +/// facet_collector.add_facet("/lang"); +/// facet_collector.add_facet("/category"); +/// searcher.search(&AllQuery, &mut facet_collector).unwrap(); +/// +/// // this object contains count aggregate for all of the facets. +/// let counts = facet_collector.harvest(); +/// +/// // This lists all of the facet counts +/// let facets: Vec<(&Facet, u64)> = counts +/// .get("/category") +/// .collect(); +/// assert_eq!(facets, vec![ +/// (&Facet::from("/category/biography"), 1), +/// (&Facet::from("/category/fiction"), 3) +/// ]); +/// } +/// +/// { +/// let mut facet_collector = FacetCollector::for_field(facet); +/// facet_collector.add_facet("/category/fiction"); +/// searcher.search(&AllQuery, &mut facet_collector).unwrap(); +/// +/// // this object contains count aggregate for all of the facets. +/// let counts = facet_collector.harvest(); +/// +/// // This lists all of the facet counts +/// let facets: Vec<(&Facet, u64)> = counts +/// .get("/category/fiction") +/// .collect(); +/// assert_eq!(facets, vec![ +/// (&Facet::from("/category/fiction/fantasy"), 2), +/// (&Facet::from("/category/fiction/horror"), 1), +/// (&Facet::from("/category/fiction/sci-fi"), 1) +/// ]); +/// } +/// +/// { +/// let mut facet_collector = FacetCollector::for_field(facet); +/// facet_collector.add_facet("/category/fiction"); +/// searcher.search(&AllQuery, &mut facet_collector).unwrap(); +/// +/// // this object contains count aggregate for all of the facets. +/// let counts = facet_collector.harvest(); +/// +/// // This lists all of the facet counts +/// let facets: Vec<(&Facet, u64)> = counts.top_k("/category/fiction", 1); +/// assert_eq!(facets, vec![ +/// (&Facet::from("/category/fiction/fantasy"), 2) +/// ]); +/// } +/// +/// Ok(()) +/// } +/// ``` + +pub struct FacetCollector { + facet_ords: Vec, + field: Field, + ff_reader: Option>, + segment_counters: Vec, + + // facet_ord -> collapse facet_id + current_segment_collapse_mapping: Vec, + // collapse facet_id -> count + current_segment_counts: Vec, + // collapse facet_id -> facet_ord + current_collapse_facet_ords: Vec, + + collapse: BTreeSet>, +} + + +use std::iter::Peekable; + +fn skip<'a, I: Iterator>>(target: &[u8], collapse_it: &mut Peekable) -> SkipResult { + loop { + println!("collapse {:?}, target {:?}", target, collapse_it.peek()); + match collapse_it.peek() { + Some(facet_bytes) => { + match facet_bytes[..].cmp(&target) { + Ordering::Less => {} + Ordering::Greater => { + return SkipResult::OverStep; + } + Ordering::Equal => { + return SkipResult::Reached; + } + } + } + None => { + return SkipResult::End; + } + } + collapse_it.next(); + } +} + +impl FacetCollector { + + /// Create a facet collector to collect the facets + /// from a specific facet `Field`. + /// + /// This function does not check whether the field + /// is of the proper type. + pub fn for_field(field: Field) -> FacetCollector { + let mut facet_collector = FacetCollector { + facet_ords: Vec::with_capacity(255), field: field, ff_reader: None, + segment_counters: Vec::new(), + collapse: BTreeSet::new(), + + current_segment_collapse_mapping: Vec::new(), + current_collapse_facet_ords: Vec::new(), + current_segment_counts: Vec::new(), + }; + facet_collector.add_facet(Facet::from("/")); + facet_collector + } + + pub fn add_facet(&mut self, facet_from: T) + where Facet: From { + let facet = Facet::from(facet_from); + let facet_bytes = facet.encoded_bytes(); + for pos in facet_bytes.iter() + .cloned() + .position(|b| b == FACET_SEP_BYTE) { + self.collapse.remove(&facet_bytes[..pos]); + } + self.collapse.insert(facet_bytes.to_owned()); + } + + fn finalize_segment(&mut self) { + if self.ff_reader.is_some() { + self.segment_counters.push( + SegmentFacetCounter { + facet_reader: unsafe { self.ff_reader.take().unwrap().into_inner() }, + facet_ords: mem::replace(&mut self.current_collapse_facet_ords, Vec::new()), + facet_counts: mem::replace(&mut self.current_segment_counts, Vec::new()), + } + ); + } + } + + fn set_collapse_mapping(&mut self, facet_reader: &FacetReader) { + self.current_segment_collapse_mapping.clear(); + self.current_collapse_facet_ords.clear(); + self.current_segment_counts.clear(); + let mut collapse_facet_it = self.collapse.iter().peekable(); + self.current_collapse_facet_ords.push(0); + let mut facet_streamer = facet_reader + .facet_dict() + .range() + .into_stream(); + if !facet_streamer.advance() { + return; + } + 'outer: loop { + // at the begining of this loop, facet_streamer + // is position on a term that has not been processed yet. + let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it); + match skip_result { + SkipResult::Reached => { + // we reach a facet we decided to collapse. + let collapse_depth = facet_depth(facet_streamer.key()); + let mut collapsed_id = 0; + self.current_segment_collapse_mapping.push(0); + while facet_streamer.advance() { + let depth = facet_depth(facet_streamer.key()); + println!("depth {}", depth); + if depth <= collapse_depth { + continue 'outer; + } else if depth == collapse_depth + 1 { + collapsed_id = self.current_collapse_facet_ords.len(); + self.current_collapse_facet_ords.push(facet_streamer.term_ord()); + self.current_segment_collapse_mapping.push(collapsed_id); + } else { + self.current_segment_collapse_mapping.push(collapsed_id); + } + } + break; + } + SkipResult::End | SkipResult::OverStep => { + self.current_segment_collapse_mapping.push(0); + if !facet_streamer.advance() { + break; + } + } + } + } + } + + /// Returns the results of the collection. + /// + /// This method does not just return the counters, + /// it also translates the facet ordinals of the last segment. + pub fn harvest(mut self) -> FacetCounts { + self.finalize_segment(); + + let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters + .iter() + .map(|segment_counter| &segment_counter.facet_ords[..]) + .collect(); + let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters + .iter() + .map(|segment_counter| &segment_counter.facet_counts[..]) + .collect(); + + + let facet_streams = self.segment_counters + .iter() + .map(|seg_counts| seg_counts + .facet_reader + .facet_dict() + .range() + .into_stream()) + .collect::>(); + + let mut facet_merger = TermMerger::new(facet_streams); + let mut facet_counts = BTreeMap::new(); + + while facet_merger.advance() { + let count = facet_merger + .current_kvs() + .iter() + .map(|it| { + let seg_ord = it.segment_ord; + let term_ord = it.streamer.term_ord(); + collapsed_facet_ords[seg_ord] + .binary_search(&term_ord) + .map(|collapsed_term_id| { + if collapsed_term_id == 0 { + 0 + } else { + collapsed_facet_counts[seg_ord][collapsed_term_id] + } + }) + .unwrap_or(0) + }) + .sum(); + println!("{:?} count {}", facet_merger.key(), count); + if count > 0u64 { + let bytes = facet_merger.key().to_owned(); + facet_counts.insert(Facet::from_encoded(bytes), count); + } + } + FacetCounts { + facet_counts: facet_counts } } } -impl Collector for FacetCollector -where - T: FastFieldReader, - T::ValueType: Eq + Hash, -{ + + + +impl Collector for FacetCollector { fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> { - self.ff_reader = Some(reader.get_fast_field_reader(self.field)?); + self.finalize_segment(); + let facet_reader = reader.facet_reader(self.field)?; + self.set_collapse_mapping(&facet_reader); + self.current_segment_counts.resize(self.current_collapse_facet_ords.len(), 0); + self.ff_reader = Some(UnsafeCell::new(facet_reader)); Ok(()) } fn collect(&mut self, doc: DocId, _: Score) { - let val = self.ff_reader - .as_ref() - .expect("collect() was called before set_segment. This should never happen.") - .get(doc); - *(self.counters.entry(val).or_insert(0)) += 1; + let facet_reader: &mut FacetReader = + unsafe { + &mut *self.ff_reader + .as_ref() + .expect("collect() was called before set_segment. This should never happen.") + .get() + }; + facet_reader.facet_ords(doc, &mut self.facet_ords); + let mut previous_collapsed_ord: usize = usize::MAX; + for &facet_ord in &self.facet_ords { + let collapsed_ord = self.current_segment_collapse_mapping[facet_ord as usize]; + self.current_segment_counts[collapsed_ord] += + if collapsed_ord == previous_collapsed_ord { + 0 + } else { + 1 + }; + previous_collapsed_ord = collapsed_ord; + } } } + + + +/// Intermediary result of the `FacetCollector` that stores +/// the facet counts for all the segments. +pub struct FacetCounts { + facet_counts: BTreeMap, +} + +impl FacetCounts { + + pub fn get<'a, T>(&'a self, facet_from: T) -> impl Iterator + where Facet: From { + let facet = Facet::from(facet_from); + let mut facet_after_bytes = facet.encoded_bytes().to_owned(); + facet_after_bytes.push(1u8); + let facet_after = Facet::from_encoded(facet_after_bytes); + self.facet_counts + .range(facet.clone()..facet_after) + .map(|(facet, count)| (facet, *count)) + } + + pub fn top_k(&self, facet: T, k: usize) -> Vec<(&Facet, u64)> + where Facet: From { + + let mut heap = BinaryHeap::with_capacity(k); + let mut it = self.get(facet); + + for (ref facet, count) in (&mut it).take(k) { + heap.push(Hit { + count: count, + facet: facet + }); + } + + let mut lowest_count: u64 = heap.peek() + .map(|hit| hit.count) + .unwrap_or(u64::MIN); + for (facet, count) in it { + if count > lowest_count { + lowest_count = count; + if let Some(mut head) = heap.peek_mut() { + *head = Hit { + count: count, + facet: facet + }; + } + } + } + heap.into_sorted_vec() + .into_iter() + .map(|hit| (hit.facet, hit.count)) + .collect::>() + } + +} + + + #[cfg(test)] mod tests { - - use collector::{chain, FacetCollector}; - use query::QueryParser; - use fastfield::{I64FastFieldReader, U64FastFieldReader}; - use schema::{self, FAST, STRING}; - use Index; + use test::Bencher; + use core::Index; + use schema::{SchemaBuilder, Document, Facet}; + use query::AllQuery; + use super::{FacetCollector, FacetCounts}; + use std::iter; + use rand::{thread_rng, Rng}; #[test] - // create 10 documents, set num field value to 0 or 1 for even/odd ones - // make sure we have facet counters correctly filled - fn test_facet_collector_results() { - let mut schema_builder = schema::SchemaBuilder::new(); - let num_field_i64 = schema_builder.add_i64_field("num_i64", FAST); - let num_field_u64 = schema_builder.add_u64_field("num_u64", FAST); - let text_field = schema_builder.add_text_field("text", STRING); + fn test_facet_collector_drilldown() { + let mut schema_builder = SchemaBuilder::new(); + let facet_field = schema_builder.add_facet_field("facet"); let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); - let index = Index::create_in_ram(schema.clone()); - - { - let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - { - for i in 0u64..10u64 { - index_writer.add_document(doc!( - num_field_i64 => ((i as i64) % 3i64) as i64, - num_field_u64 => (i % 2u64) as u64, - text_field => "text" - )); - } - } - assert_eq!(index_writer.commit().unwrap(), 10u64); + let mut index_writer = index.writer(3_000_000).unwrap(); + let num_facets: usize = 3 * 4 * 5; + let facets: Vec = (0..num_facets) + .map(|mut n| { + let top = n % 3; + n /= 3; + let mid = n % 4; + n /= 4; + let leaf = n % 5; + Facet::from(&format!("/top{}/mid{}/leaf{}", top, mid, leaf)) + }) + .collect(); + for i in 0..num_facets * 10 { + let mut doc = Document::new(); + doc.add_facet(facet_field, facets[i % num_facets].clone()); + index_writer.add_document(doc); } + index_writer.commit().unwrap(); + index.load_searchers().unwrap(); let searcher = index.searcher(); - let mut ffvf_i64: FacetCollector = FacetCollector::new(num_field_i64); - let mut ffvf_u64: FacetCollector = FacetCollector::new(num_field_u64); + + let mut facet_collector = FacetCollector::for_field(facet_field); + facet_collector.add_facet(Facet::from("/top1")); + searcher.search(&AllQuery, &mut facet_collector).unwrap(); + + let counts: FacetCounts = facet_collector.harvest(); + { + let facets: Vec<(String, u64)> = counts + .get("/top1") + .map(|(facet, count)| (facet.to_string(), count)) + .collect(); + assert_eq!(facets, [ + ("/top1/mid0", 50), + ("/top1/mid1", 50), + ("/top1/mid2", 50), + ("/top1/mid3", 50), + ].iter() + .map(|&(facet_str, count)| { + (String::from(facet_str), count) + }) + .collect::>()); + } + } + + #[test] + fn test_facet_collector_topk() { + let mut schema_builder = SchemaBuilder::new(); + let facet_field = schema_builder.add_facet_field("facet"); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + + let mut docs: Vec = vec![ + ("a", 10), + ("b", 100), + ("c", 7), + ("d", 12), + ("e", 21) + ].into_iter() + .flat_map(|(c, count)| { + let facet = Facet::from(&format!("/facet_{}", c)); + let doc = doc!(facet_field => facet); + iter::repeat(doc).take(count) + }).collect(); + thread_rng().shuffle(&mut docs[..]); + + let mut index_writer = index.writer(3_000_000).unwrap(); + for doc in docs { + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + index.load_searchers().unwrap(); + + let searcher = index.searcher(); + + let mut facet_collector = FacetCollector::for_field(facet_field); + facet_collector.add_facet("/"); + + searcher.search(&AllQuery, &mut facet_collector).unwrap(); + + let counts: FacetCounts = facet_collector.harvest(); { - // perform the query - let mut facet_collectors = chain().push(&mut ffvf_i64).push(&mut ffvf_u64); - let query_parser = QueryParser::for_index(&index, vec![text_field]); - let query = query_parser.parse_query("text:text").unwrap(); - query.search(&searcher, &mut facet_collectors).unwrap(); + let facets: Vec<(&Facet, u64)> = counts.top_k("/", 3); + assert_eq!( + facets, + vec![ + (&Facet::from("/facet_b"), 100), + (&Facet::from("/facet_e"), 21), + (&Facet::from("/facet_d"), 12) + ]); } + } - assert_eq!(ffvf_u64.counters[&0], 5); - assert_eq!(ffvf_u64.counters[&1], 5); - assert_eq!(ffvf_i64.counters[&0], 4); - assert_eq!(ffvf_i64.counters[&1], 3); + #[bench] + fn bench_facet_collector(b: &mut Bencher) { + let mut schema_builder = SchemaBuilder::new(); + let facet_field = schema_builder.add_facet_field("facet"); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + + let mut docs = vec!(); + for val in 0..50 { + let facet = Facet::from(&format!("/facet_{}", val)); + for _ in 0..val*val { + docs.push(doc!(facet_field=>facet.clone())); + } + } + // 40425 docs + thread_rng().shuffle(&mut docs[..]); + + let mut index_writer = index.writer(3_000_000).unwrap(); + for doc in docs { + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + index.load_searchers().unwrap(); + + b.iter(|| { + let searcher = index.searcher(); + let mut facet_collector = FacetCollector::for_field(facet_field); + searcher.search(&AllQuery, &mut facet_collector).unwrap(); + }); } } + diff --git a/src/collector/int_facet_collector.rs b/src/collector/int_facet_collector.rs new file mode 100644 index 000000000..72cfd711d --- /dev/null +++ b/src/collector/int_facet_collector.rs @@ -0,0 +1,123 @@ +use std::cmp::Eq; +use std::collections::HashMap; +use std::hash::Hash; + +use collector::Collector; +use fastfield::FastFieldReader; +use schema::Field; + +use DocId; +use Result; +use Score; +use SegmentReader; +use SegmentLocalId; + + +/// Facet collector for i64/u64 fast field +pub struct IntFacetCollector +where + T: FastFieldReader, + T::ValueType: Eq + Hash, +{ + counters: HashMap, + field: Field, + ff_reader: Option, +} + + +impl IntFacetCollector +where + T: FastFieldReader, + T::ValueType: Eq + Hash, +{ + /// Creates a new facet collector for aggregating a given field. + pub fn new(field: Field) -> IntFacetCollector { + IntFacetCollector { + counters: HashMap::new(), + field: field, + ff_reader: None, + } + } +} + + +impl Collector for IntFacetCollector +where + T: FastFieldReader, + T::ValueType: Eq + Hash, +{ + fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> { + self.ff_reader = Some(reader.get_fast_field_reader(self.field)?); + Ok(()) + } + + fn collect(&mut self, doc: DocId, _: Score) { + let val = self.ff_reader + .as_ref() + .expect( + "collect() was called before set_segment. \ + This should never happen.", + ) + .get(doc); + *(self.counters.entry(val).or_insert(0)) += 1; + } +} + + + +#[cfg(test)] +mod tests { + + use collector::{chain, IntFacetCollector}; + use query::QueryParser; + use fastfield::{I64FastFieldReader, U64FastFieldReader}; + use schema::{self, FAST, STRING}; + use Index; + + #[test] + // create 10 documents, set num field value to 0 or 1 for even/odd ones + // make sure we have facet counters correctly filled + fn test_facet_collector_results() { + + let mut schema_builder = schema::SchemaBuilder::new(); + let num_field_i64 = schema_builder.add_i64_field("num_i64", FAST); + let num_field_u64 = schema_builder.add_u64_field("num_u64", FAST); + let text_field = schema_builder.add_text_field("text", STRING); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema.clone()); + + { + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + for i in 0u64..10u64 { + index_writer.add_document(doc!( + num_field_i64 => ((i as i64) % 3i64) as i64, + num_field_u64 => (i % 2u64) as u64, + text_field => "text" + )); + } + } + assert_eq!(index_writer.commit().unwrap(), 10u64); + } + + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let mut ffvf_i64: IntFacetCollector = IntFacetCollector::new(num_field_i64); + let mut ffvf_u64: IntFacetCollector = IntFacetCollector::new(num_field_u64); + + { + // perform the query + let mut facet_collectors = chain().push(&mut ffvf_i64).push(&mut ffvf_u64); + let mut query_parser = QueryParser::for_index(index, vec![text_field]); + let query = query_parser.parse_query("text:text").unwrap(); + query.search(&searcher, &mut facet_collectors).unwrap(); + } + + assert_eq!(ffvf_u64.counters[&0], 5); + assert_eq!(ffvf_u64.counters[&1], 5); + assert_eq!(ffvf_i64.counters[&0], 4); + assert_eq!(ffvf_i64.counters[&1], 3); + + } +} diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index 30142f8f5..cb3b81e72 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -89,6 +89,7 @@ impl BitPacker { } } +#[derive(Clone)] pub struct BitUnpacker where Data: Deref, diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index ecb58aac0..341dc051c 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -4,14 +4,47 @@ use std::collections::HashMap; use schema::Field; use common::VInt; use directory::WritePtr; -use std::io; +use std::io::{self, Read}; use directory::ReadOnlySource; use common::BinarySerializable; + +#[derive(Eq, PartialEq, Hash, Copy, Ord, PartialOrd, Clone, Debug)] +pub struct FileAddr { + field: Field, + idx: usize, +} + +impl FileAddr { + fn new(field: Field, idx: usize) -> FileAddr { + FileAddr { + field: field, + idx: idx + } + } +} + +impl BinarySerializable for FileAddr { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.field.serialize(writer)?; + VInt(self.idx as u64).serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let field = Field::deserialize(reader)?; + let idx = VInt::deserialize(reader)?.0 as usize; + Ok(FileAddr { + field: field, + idx: idx + }) + } +} + /// A `CompositeWrite` is used to write a `CompositeFile`. pub struct CompositeWrite { write: CountingWriter, - offsets: HashMap, + offsets: HashMap, } impl CompositeWrite { @@ -26,12 +59,19 @@ impl CompositeWrite { /// Start writing a new field. pub fn for_field(&mut self, field: Field) -> &mut CountingWriter { + self.for_field_with_idx(field, 0) + } + + /// Start writing a new field. + pub fn for_field_with_idx(&mut self, field: Field, idx: usize) -> &mut CountingWriter { let offset = self.write.written_bytes(); - assert!(!self.offsets.contains_key(&field)); - self.offsets.insert(field, offset); + let file_addr = FileAddr::new(field, idx); + assert!(!self.offsets.contains_key(&file_addr)); + self.offsets.insert(file_addr, offset); &mut self.write } + /// Close the composite file. /// /// An index of the different field offsets @@ -42,16 +82,18 @@ impl CompositeWrite { let mut offset_fields: Vec<_> = self.offsets .iter() - .map(|(field, offset)| (offset, field)) + .map(|(file_addr, offset)| (*offset, *file_addr)) .collect(); offset_fields.sort(); let mut prev_offset = 0; - for (offset, field) in offset_fields { - VInt((offset - prev_offset) as u64).serialize(&mut self.write)?; - field.serialize(&mut self.write)?; - prev_offset = *offset; + for (offset, file_addr) in offset_fields { + VInt((offset - prev_offset) as u64).serialize( + &mut self.write, + )?; + file_addr.serialize(&mut self.write)?; + prev_offset = offset; } let footer_len = (self.write.written_bytes() - footer_offset) as u32; @@ -61,6 +103,7 @@ impl CompositeWrite { } } + /// A composite file is an abstraction to store a /// file partitioned by field. /// @@ -70,7 +113,7 @@ impl CompositeWrite { #[derive(Clone)] pub struct CompositeFile { data: ReadOnlySource, - offsets_index: HashMap, + offsets_index: HashMap, } impl CompositeFile { @@ -86,7 +129,7 @@ impl CompositeFile { let mut footer_buffer = footer_data.as_slice(); let num_fields = VInt::deserialize(&mut footer_buffer)?.0 as usize; - let mut fields = vec![]; + let mut file_addrs = vec![]; let mut offsets = vec![]; let mut field_index = HashMap::new(); @@ -94,16 +137,16 @@ impl CompositeFile { let mut offset = 0; for _ in 0..num_fields { offset += VInt::deserialize(&mut footer_buffer)?.0 as usize; - let field = Field::deserialize(&mut footer_buffer)?; + let file_addr = FileAddr::deserialize(&mut footer_buffer)?; offsets.push(offset); - fields.push(field); + file_addrs.push(file_addr); } offsets.push(footer_start); for i in 0..num_fields { - let field = fields[i]; + let file_addr = file_addrs[i]; let start_offset = offsets[i]; let end_offset = offsets[i + 1]; - field_index.insert(field, (start_offset, end_offset)); + field_index.insert(file_addr, (start_offset, end_offset)); } Ok(CompositeFile { @@ -124,18 +167,27 @@ impl CompositeFile { /// Returns the `ReadOnlySource` associated /// to a given `Field` and stored in a `CompositeFile`. pub fn open_read(&self, field: Field) -> Option { + self.open_read_with_idx(field, 0) + } + + /// Returns the `ReadOnlySource` associated + /// to a given `Field` and stored in a `CompositeFile`. + pub fn open_read_with_idx(&self, field: Field, idx: usize) -> Option { self.offsets_index - .get(&field) - .map(|&(from, to)| self.data.slice(from, to)) + .get(&FileAddr {field: field, idx: idx}) + .map(|&(from, to)| { + self.data.slice(from, to) + }) } } + #[cfg(test)] mod test { use std::io::Write; - use super::{CompositeFile, CompositeWrite}; - use directory::{Directory, RAMDirectory}; + use super::{CompositeWrite, CompositeFile}; + use directory::{RAMDirectory, Directory}; use schema::Field; use common::VInt; use common::BinarySerializable; @@ -179,6 +231,7 @@ mod test { assert_eq!(payload_4, 2u64); } } + } } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 24755e473..c9ee67073 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -17,8 +17,14 @@ use common::CompositeFile; use std::fmt; use core::InvertedIndexReader; use schema::Field; +use schema::FieldType; +use error::ErrorKind; +use termdict::TermDictionaryImpl; +use fastfield::FacetReader; use fastfield::{FastFieldReader, U64FastFieldReader}; use schema::Schema; +use termdict::TermDictionary; +use fastfield::MultiValueIntFastFieldReader; /// Entry point to access all of the datastructures of the `Segment` /// @@ -98,6 +104,40 @@ impl SegmentReader { } } + /// Accessor to the `FacetReader` associated to a given `Field`. + pub fn facet_reader(&self, field: Field) -> Result { + let field_entry = self.schema.get_field_entry(field); + if field_entry.field_type() != &FieldType::HierarchicalFacet { + return Err(ErrorKind::InvalidArgument(format!("The field {:?} is not a \ + hierarchical facet.", field_entry)).into()) + } + let term_ords_reader = self.multi_value_reader(field)?; + let termdict_source = self.termdict_composite + .open_read(field) + .ok_or_else(|| { + ErrorKind::InvalidArgument(format!("The field \"{}\" is a hierarchical \ + but this segment does not seem to have the field term \ + dictionary.", field_entry.name())) + })?; + let termdict = TermDictionaryImpl::from_source(termdict_source); + let facet_reader = FacetReader::new(term_ords_reader, termdict); + Ok(facet_reader) + } + + /// Accessor to the `MultiValueIntFastFieldReader` associated to a given `Field`. + pub fn multi_value_reader(&self, field: Field) -> Result { + let field_entry = self.schema.get_field_entry(field); + let idx_reader = self.fast_fields_composite + .open_read_with_idx(field, 0) + .ok_or_else(|| FastFieldNotAvailableError::new(field_entry)) + .map(U64FastFieldReader::open)?; + let vals_reader = self.fast_fields_composite + .open_read_with_idx(field, 1) + .ok_or_else(|| FastFieldNotAvailableError::new(field_entry)) + .map(U64FastFieldReader::open)?; + Ok(MultiValueIntFastFieldReader::open(idx_reader, vals_reader)) + } + /// Accessor to the segment's `Field norms`'s reader. /// /// Field norms are the length (in tokens) of the fields. diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs index 475a706f0..04670fe8f 100644 --- a/src/datastruct/stacker/hashmap.rs +++ b/src/datastruct/stacker/hashmap.rs @@ -1,6 +1,7 @@ use std::iter; use std::mem; -use super::heap::{BytesRef, Heap, HeapAllocable}; +use postings::UnorderedTermId; +use super::heap::{Heap, HeapAllocable, BytesRef}; mod murmurhash2 { @@ -9,7 +10,7 @@ mod murmurhash2 { #[inline(always)] pub fn murmurhash2(key: &[u8]) -> u32 { let mut key_ptr: *const u32 = key.as_ptr() as *const u32; - let m: u32 = 0x5bd1_e995; + let m: u32 = 0x5bd1e995; let r = 24; let len = key.len() as u32; @@ -30,18 +31,18 @@ mod murmurhash2 { let key_ptr_u8: *const u8 = key_ptr as *const u8; match remaining { 3 => { - h ^= unsafe { u32::from(*key_ptr_u8.wrapping_offset(2)) } << 16; - h ^= unsafe { u32::from(*key_ptr_u8.wrapping_offset(1)) } << 8; - h ^= unsafe { u32::from(*key_ptr_u8) }; + h ^= unsafe { *key_ptr_u8.wrapping_offset(2) as u32 } << 16; + h ^= unsafe { *key_ptr_u8.wrapping_offset(1) as u32 } << 8; + h ^= unsafe { *key_ptr_u8 as u32 }; h = h.wrapping_mul(m); } 2 => { - h ^= unsafe { u32::from(*key_ptr_u8.wrapping_offset(1)) } << 8; - h ^= unsafe { u32::from(*key_ptr_u8) }; + h ^= unsafe { *key_ptr_u8.wrapping_offset(1) as u32 } << 8; + h ^= unsafe { *key_ptr_u8 as u32 }; h = h.wrapping_mul(m); } 1 => { - h ^= unsafe { u32::from(*key_ptr_u8) }; + h ^= unsafe { *key_ptr_u8 as u32 }; h = h.wrapping_mul(m); } _ => {} @@ -52,6 +53,9 @@ mod murmurhash2 { } } + + + /// Split the thread memory budget into /// - the heap size /// - the hash table "table" itself. @@ -59,10 +63,15 @@ mod murmurhash2 { /// Returns (the heap size in bytes, the hash table size in number of bits) pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) { let table_size_limit: usize = per_thread_memory_budget / 3; - let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::(); + let compute_table_size = |num_bits: usize| { + let table_size: usize = (1 << num_bits) * mem::size_of::(); + table_size * mem::size_of::() + }; let table_num_bits: usize = (1..) .into_iter() - .take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit) + .take_while(|num_bits: &usize| { + compute_table_size(*num_bits) < table_size_limit + }) .last() .expect(&format!( "Per thread memory is too small: {}", @@ -73,6 +82,7 @@ pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) { (heap_size, table_num_bits) } + /// `KeyValue` is the item stored in the hash table. /// The key is actually a `BytesRef` object stored in an external heap. /// The `value_addr` also points to an address in the heap. @@ -81,6 +91,7 @@ pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) { /// For this reason, the (start, stop) information is actually redundant /// and can be simplified in the future #[derive(Copy, Clone, Default)] +#[repr(packed)] struct KeyValue { key_value_addr: BytesRef, hash: u32, @@ -92,6 +103,7 @@ impl KeyValue { } } + /// Customized `HashMap` with string keys /// /// This `HashMap` takes String as keys. Keys are @@ -101,13 +113,14 @@ impl KeyValue { /// the computation of the hash of the key twice, /// or copying the key as long as there is no insert. /// -pub struct HashMap<'a> { +pub struct TermHashMap<'a> { table: Box<[KeyValue]>, heap: &'a Heap, mask: usize, occupied: Vec, } + struct QuadraticProbing { hash: usize, i: usize, @@ -116,7 +129,11 @@ struct QuadraticProbing { impl QuadraticProbing { fn compute(hash: usize, mask: usize) -> QuadraticProbing { - QuadraticProbing { hash, i: 0, mask } + QuadraticProbing { + hash: hash, + i: 0, + mask: mask, + } } #[inline] @@ -126,13 +143,14 @@ impl QuadraticProbing { } } -impl<'a> HashMap<'a> { - pub fn new(num_bucket_power_of_2: usize, heap: &'a Heap) -> HashMap<'a> { + +impl<'a> TermHashMap<'a> { + pub fn new(num_bucket_power_of_2: usize, heap: &'a Heap) -> TermHashMap<'a> { let table_size = 1 << num_bucket_power_of_2; let table: Vec = iter::repeat(KeyValue::default()).take(table_size).collect(); - HashMap { + TermHashMap { table: table.into_boxed_slice(), - heap, + heap: heap, mask: table_size - 1, occupied: Vec::with_capacity(table_size / 2), } @@ -157,18 +175,23 @@ impl<'a> HashMap<'a> { self.occupied.push(bucket); self.table[bucket] = KeyValue { key_value_addr: key_bytes_ref, - hash, + hash: hash, }; } - pub fn iter<'b: 'a>(&'b self) -> impl Iterator + 'b { - self.occupied.iter().cloned().map(move |bucket: usize| { - let kv = self.table[bucket]; - self.get_key_value(kv.key_value_addr) - }) + pub fn iter<'b: 'a>(&'b self) -> impl Iterator + 'b { + self.occupied + .iter() + .cloned() + .map(move |bucket: usize| { + let kv = self.table[bucket]; + let (key, offset) = self.get_key_value(kv.key_value_addr); + (key, offset, bucket as UnorderedTermId) + }) } - pub fn get_or_create, V: HeapAllocable>(&mut self, key: S) -> &mut V { + + pub fn get_or_create, V: HeapAllocable>(&mut self, key: S) -> (UnorderedTermId, &mut V) { let key_bytes: &[u8] = key.as_ref(); let hash = murmurhash2::murmurhash2(key.as_ref()); let mut probe = self.probe(hash); @@ -180,17 +203,18 @@ impl<'a> HashMap<'a> { let (addr, val): (u32, &mut V) = self.heap.allocate_object(); assert_eq!(addr, key_bytes_ref.addr() + 2 + key_bytes.len() as u32); self.set_bucket(hash, key_bytes_ref, bucket); - return val; + return (bucket, val); } else if kv.hash == hash { let (stored_key, expull_addr): (&[u8], u32) = self.get_key_value(kv.key_value_addr); if stored_key == key_bytes { - return self.heap.get_mut_ref(expull_addr); + return (bucket, self.heap.get_mut_ref(expull_addr)); } } } } } + #[cfg(test)] mod tests { @@ -201,6 +225,7 @@ mod tests { use std::collections::HashSet; use super::split_memory; + struct TestValue { val: u32, _addr: u32, @@ -217,41 +242,42 @@ mod tests { #[test] fn test_hashmap_size() { - assert_eq!(split_memory(100_000), (67232, 12)); - assert_eq!(split_memory(1_000_000), (737856, 15)); - assert_eq!(split_memory(10_000_000), (7902848, 18)); + assert_eq!(split_memory(100_000), (67232, 9)); + assert_eq!(split_memory(1_000_000), (737856, 12)); + assert_eq!(split_memory(10_000_000), (7902848, 15)); } + #[test] fn test_hash_map() { let heap = Heap::with_capacity(2_000_000); - let mut hash_map: HashMap = HashMap::new(18, &heap); + let mut hash_map: TermHashMap = TermHashMap::new(18, &heap); { - let v: &mut TestValue = hash_map.get_or_create("abc"); + let v: &mut TestValue = hash_map.get_or_create("abc").1; assert_eq!(v.val, 0u32); v.val = 3u32; } { - let v: &mut TestValue = hash_map.get_or_create("abcd"); + let v: &mut TestValue = hash_map.get_or_create("abcd").1; assert_eq!(v.val, 0u32); v.val = 4u32; } { - let v: &mut TestValue = hash_map.get_or_create("abc"); + let v: &mut TestValue = hash_map.get_or_create("abc").1; assert_eq!(v.val, 3u32); } { - let v: &mut TestValue = hash_map.get_or_create("abcd"); + let v: &mut TestValue = hash_map.get_or_create("abcd").1; assert_eq!(v.val, 4u32); } let mut iter_values = hash_map.iter(); { - let (_, addr) = iter_values.next().unwrap(); + let (_, addr, _) = iter_values.next().unwrap(); let val: &TestValue = heap.get_ref(addr); assert_eq!(val.val, 3u32); } { - let (_, addr) = iter_values.next().unwrap(); + let (_, addr, _) = iter_values.next().unwrap(); let val: &TestValue = heap.get_ref(addr); assert_eq!(val.val, 4u32); } @@ -295,4 +321,5 @@ mod tests { }); } + } diff --git a/src/datastruct/stacker/mod.rs b/src/datastruct/stacker/mod.rs index 248b56b59..1d6cac450 100644 --- a/src/datastruct/stacker/mod.rs +++ b/src/datastruct/stacker/mod.rs @@ -4,7 +4,7 @@ mod expull; pub use self::heap::{Heap, HeapAllocable}; pub use self::expull::ExpUnrolledLinkedList; -pub use self::hashmap::HashMap; +pub use self::hashmap::TermHashMap; #[test] fn test_unrolled_linked_list() { @@ -16,15 +16,15 @@ fn test_unrolled_linked_list() { ks.push(2); ks.push(3); for k in (1..5).map(|k| k * 100) { - let mut hashmap: HashMap = HashMap::new(10, &heap); + let mut hashmap: TermHashMap = TermHashMap::new(10, &heap); for j in 0..k { for i in 0..500 { - let v: &mut ExpUnrolledLinkedList = hashmap.get_or_create(i.to_string()); + let v: &mut ExpUnrolledLinkedList = hashmap.get_or_create(i.to_string()).1; v.push(i * j, &heap); } } let mut map_addr: collections::HashMap, u32> = collections::HashMap::new(); - for (key, addr) in hashmap.iter() { + for (key, addr, _) in hashmap.iter() { map_addr.insert(Vec::from(key), addr); } @@ -39,5 +39,6 @@ fn test_unrolled_linked_list() { assert!(!it.next().is_some()); } } + } } diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 698057152..10d9a85d9 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -4,7 +4,7 @@ use super::shared_vec_slice::SharedVecSlice; use common::HasLen; use std::slice; use std::io::{self, Read}; -use stable_deref_trait::StableDeref; +use stable_deref_trait::{StableDeref, CloneStableDeref}; /// Read object that represents files in tantivy. /// @@ -20,6 +20,7 @@ pub enum ReadOnlySource { } unsafe impl StableDeref for ReadOnlySource {} +unsafe impl CloneStableDeref for ReadOnlySource {} impl Deref for ReadOnlySource { type Target = [u8]; diff --git a/src/fastfield/facet_reader.rs b/src/fastfield/facet_reader.rs new file mode 100644 index 000000000..d7a9d8ce9 --- /dev/null +++ b/src/fastfield/facet_reader.rs @@ -0,0 +1,67 @@ +use super::MultiValueIntFastFieldReader; +use DocId; +use termdict::TermOrdinal; +use schema::Facet; +use termdict::{TermDictionary, TermDictionaryImpl}; + + +/// The facet reader makes it possible to access the list of +/// facets associated to a given document in a specific +/// segment. +/// +/// Rather than manipulating `Facet` object directly, the API +/// exposes those in the form of list of `Facet` ordinal. +/// +/// A segment ordinal can then be translated into a facet via +/// `.facet_from_ord(...)`. +/// +/// Facet ordinals are defined as their position in the sorted +/// list of facets. This ordinal is segment local and +/// only makes sense for a given segment. +pub struct FacetReader { + term_ords: MultiValueIntFastFieldReader, + term_dict: TermDictionaryImpl, +} + +impl FacetReader { + + /// Creates a new `FacetReader`. + /// + /// A facet reader just wraps : + /// - a `MultiValueIntFastFieldReader` that makes it possible to + /// access the list of facet ords for a given document. + /// - a `TermDictionaryImpl` that helps associating a facet to + /// an ordinal and vice versa. + pub fn new(term_ords: MultiValueIntFastFieldReader, + term_dict: TermDictionaryImpl) -> FacetReader { + FacetReader { + term_ords: term_ords, + term_dict: term_dict, + } + } + + /// Returns the size of the sets of facets in the segment. + /// This does not take in account the documents that may be marked + /// as deleted. + /// + /// `Facet` ordinals range from `0` to `num_facets() - 1`. + pub fn num_facets(&self) -> usize { + self.term_dict.num_terms() + } + + /// Accessor for the facet term dictionary. + pub fn facet_dict(&self) -> &TermDictionaryImpl { + &self.term_dict + } + + /// Given a term ordinal returns the term associated to it. + pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) { + let found_term = self.term_dict.ord_to_term(facet_ord as u64, output.inner_buffer_mut()); + assert!(found_term, "Term ordinal {} no found.", facet_ord); + } + + /// Return the list of facet ordinals associated to a document. + pub fn facet_ords(&mut self, doc: DocId, output: &mut Vec) { + self.term_ords.get_vals(doc, output); + } +} \ No newline at end of file diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 7912628fb..15439bad3 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -28,6 +28,8 @@ mod writer; mod serializer; mod error; mod delete; +mod facet_reader; +mod multivalued; pub use self::delete::write_delete_bitset; pub use self::delete::DeleteBitSet; @@ -36,6 +38,8 @@ pub use self::reader::{I64FastFieldReader, U64FastFieldReader}; pub use self::reader::FastFieldReader; pub use self::serializer::FastFieldSerializer; pub use self::error::{FastFieldNotAvailableError, Result}; +pub use self::facet_reader::FacetReader; +pub use self::multivalued::MultiValueIntFastFieldReader; #[cfg(test)] mod tests { @@ -46,6 +50,7 @@ mod tests { use schema::Document; use schema::{Schema, SchemaBuilder}; use schema::FAST; + use std::collections::HashMap; use test::Bencher; use test; use fastfield::FastFieldReader; @@ -90,12 +95,12 @@ mod tests { add_single_field_doc(&mut fast_field_writers, *FIELD, 13u64); add_single_field_doc(&mut fast_field_writers, *FIELD, 14u64); add_single_field_doc(&mut fast_field_writers, *FIELD, 2u64); - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); { - assert_eq!(source.len(), 35 as usize); + assert_eq!(source.len(), 36 as usize); } { let composite_file = CompositeFile::open(&source).unwrap(); @@ -124,7 +129,7 @@ mod tests { add_single_field_doc(&mut fast_field_writers, *FIELD, 1_002u64); add_single_field_doc(&mut fast_field_writers, *FIELD, 1_501u64); add_single_field_doc(&mut fast_field_writers, *FIELD, 215u64); - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -159,7 +164,7 @@ mod tests { for _ in 0..10_000 { add_single_field_doc(&mut fast_field_writers, *FIELD, 100_000u64); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -194,7 +199,7 @@ mod tests { 5_000_000_000_000_000_000u64 + i, ); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -233,7 +238,7 @@ mod tests { doc.add_i64(i64_field, i); fast_field_writers.add_document(&doc); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -272,7 +277,7 @@ mod tests { let mut fast_field_writers = FastFieldsWriter::from_schema(&schema); let doc = Document::default(); fast_field_writers.add_document(&doc); - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } @@ -306,7 +311,7 @@ mod tests { for x in &permutation { add_single_field_doc(&mut fast_field_writers, *FIELD, *x); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -361,7 +366,7 @@ mod tests { for x in &permutation { add_single_field_doc(&mut fast_field_writers, *FIELD, *x); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); @@ -393,7 +398,7 @@ mod tests { for x in &permutation { add_single_field_doc(&mut fast_field_writers, *FIELD, *x); } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } let source = directory.open_read(&path).unwrap(); diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs new file mode 100644 index 000000000..726682826 --- /dev/null +++ b/src/fastfield/multivalued/mod.rs @@ -0,0 +1,5 @@ +mod writer; +mod reader; + +pub use self::writer::MultiValueIntFastFieldWriter; +pub use self::reader::MultiValueIntFastFieldReader; \ No newline at end of file diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs new file mode 100644 index 000000000..afd4220d2 --- /dev/null +++ b/src/fastfield/multivalued/reader.rs @@ -0,0 +1,113 @@ +use DocId; +use fastfield::FastFieldReader; + +use fastfield::U64FastFieldReader; + +/// Reader for a multivalued `u64` fast field. +/// +/// The reader is implemented as two `u64` fast field. +/// +/// The `vals_reader` will access the concatenated list of all +/// values for all reader. +/// The `idx_reader` associated, for each document, the index of its first value. +/// +#[derive(Clone)] +pub struct MultiValueIntFastFieldReader { + idx_reader: U64FastFieldReader, + vals_reader: U64FastFieldReader, +} + +impl MultiValueIntFastFieldReader { + + pub(crate) fn open(idx_reader: U64FastFieldReader, vals_reader: U64FastFieldReader) -> MultiValueIntFastFieldReader { + MultiValueIntFastFieldReader { + idx_reader: idx_reader, + vals_reader: vals_reader, + } + } + + /// Returns the array of values associated to the given `doc`. + pub fn get_vals(&self, doc: DocId, vals: &mut Vec) { + let start = self.idx_reader.get(doc) as u32; + let stop = self.idx_reader.get(doc + 1) as u32; + vals.clear(); + for val_id in start..stop { + let val = self.vals_reader.get(val_id); + vals.push(val); + } + } +} + + +#[cfg(test)] +mod tests { + + use core::Index; + use schema::{Facet, Document, SchemaBuilder}; + + #[test] + fn test_multifastfield_reader() { + let mut schema_builder = SchemaBuilder::new(); + let facet_field = schema_builder.add_facet_field("facets"); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).expect("Failed to create index writer."); + { + let mut doc = Document::new(); + doc.add_facet(facet_field, "/category/cat2"); + doc.add_facet(facet_field, "/category/cat1"); + index_writer.add_document(doc); + } + { + let mut doc = Document::new(); + doc.add_facet(facet_field, "/category/cat2"); + index_writer.add_document(doc); + } + { + let mut doc = Document::new(); + doc.add_facet(facet_field, "/category/cat3"); + index_writer.add_document(doc); + } + index_writer.commit().expect("Commit failed"); + index.load_searchers().expect("Reloading searchers"); + let searcher = index.searcher(); + let segment_reader = searcher.segment_reader(0); + let mut facet_reader = segment_reader + .facet_reader(facet_field) + .unwrap(); + + let mut facet = Facet::root(); + { + facet_reader.facet_from_ord(0, &mut facet); + assert_eq!(facet, Facet::from("/category")); + } + { + facet_reader.facet_from_ord(1, &mut facet); + assert_eq!(facet, Facet::from("/category/cat1")); + } + { + facet_reader.facet_from_ord(2, &mut facet); + assert_eq!(facet, Facet::from("/category/cat2")); + } + { + facet_reader.facet_from_ord(3, &mut facet); + assert_eq!(facet, Facet::from("/category/cat3")); + } + + let mut vals = Vec::new(); + { + facet_reader.facet_ords(0, &mut vals); + assert_eq!(&vals[..], &[2, 1]); + } + { + facet_reader.facet_ords(1, &mut vals); + assert_eq!(&vals[..], &[2]); + } + { + facet_reader.facet_ords(2, &mut vals); + assert_eq!(&vals[..], &[3]); + } + + + } +} \ No newline at end of file diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs new file mode 100644 index 000000000..499b8bad4 --- /dev/null +++ b/src/fastfield/multivalued/writer.rs @@ -0,0 +1,62 @@ +use fastfield::FastFieldSerializer; +use std::collections::HashMap; +use postings::UnorderedTermId; +use schema::Field; +use std::io; + +pub struct MultiValueIntFastFieldWriter { + field: Field, + vals: Vec, + doc_index: Vec, +} + +impl MultiValueIntFastFieldWriter { + /// Creates a new `IntFastFieldWriter` + pub fn new(field: Field) -> Self { + MultiValueIntFastFieldWriter { + field: field, + vals: Vec::new(), + doc_index: Vec::new(), + } + } + + pub fn field(&self) -> Field { + self.field + } + + pub fn next_doc(&mut self) { + self.doc_index.push(self.vals.len() as u64); + } + + /// Records a new value. + /// + /// The n-th value being recorded is implicitely + /// associated to the document with the `DocId` n. + /// (Well, `n-1` actually because of 0-indexing) + pub fn add_val(&mut self, val: UnorderedTermId) { + self.vals.push(val); + } + + /// Push the fast fields value to the `FastFieldWriter`. + pub fn serialize(&self, serializer: &mut FastFieldSerializer, mapping: &HashMap) -> io::Result<()> { + { + // writing the offset index + let mut doc_index_serializer = serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?; + for &offset in &self.doc_index { + doc_index_serializer.add_val(offset)?; + } + doc_index_serializer.add_val(self.vals.len() as u64)?; + doc_index_serializer.close_field()?; + } + { + // writing the values themselves. + let mut value_serializer = serializer.new_u64_fast_field_with_idx(self.field, 0u64, mapping.len() as u64, 1)?; + for val in &self.vals { + value_serializer.add_val(*mapping.get(val).expect("Missing term ordinal") as u64)?; + } + value_serializer.close_field()?; + } + Ok(()) + + } +} diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 9739345b4..3e0486a1c 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -5,11 +5,12 @@ use DocId; use schema::SchemaBuilder; use std::path::Path; use schema::FAST; -use directory::{Directory, RAMDirectory, WritePtr}; +use directory::{WritePtr, RAMDirectory, Directory}; use fastfield::{FastFieldSerializer, FastFieldsWriter}; use schema::FieldType; use std::mem; use common::CompositeFile; +use std::collections::HashMap; use owning_ref::OwningRef; /// Trait for accessing a fastfield. @@ -50,6 +51,7 @@ pub trait FastFieldReader: Sized { } /// `FastFieldReader` for unsigned 64-bits integers. +#[derive(Clone)] pub struct U64FastFieldReader { bit_unpacker: BitUnpacker>, min_value: u64, @@ -86,6 +88,7 @@ impl FastFieldReader for U64FastFieldReader { fn is_enabled(field_type: &FieldType) -> bool { match *field_type { FieldType::U64(ref integer_options) => integer_options.is_fast(), + FieldType::HierarchicalFacet => { true }, _ => false, } } @@ -110,6 +113,7 @@ impl FastFieldReader for U64FastFieldReader { u64::deserialize(&mut cursor).expect("Failed to read the min_value of fast field."); amplitude = u64::deserialize(&mut cursor).expect("Failed to read the amplitude of fast field."); + } let max_value = min_value + amplitude; let num_bits = compute_num_bits(amplitude); @@ -123,6 +127,7 @@ impl FastFieldReader for U64FastFieldReader { } } + impl From> for U64FastFieldReader { fn from(vals: Vec) -> U64FastFieldReader { let mut schema_builder = SchemaBuilder::default(); @@ -131,21 +136,22 @@ impl From> for U64FastFieldReader { let path = Path::new("__dummy__"); let mut directory: RAMDirectory = RAMDirectory::create(); { - let write: WritePtr = directory - .open_write(path) - .expect("With a RAMDirectory, this should never fail."); - let mut serializer = FastFieldSerializer::from_write(write) - .expect("With a RAMDirectory, this should never fail."); + let write: WritePtr = directory.open_write(path).expect( + "With a RAMDirectory, this should never fail.", + ); + let mut serializer = FastFieldSerializer::from_write(write).expect( + "With a RAMDirectory, this should never fail.", + ); let mut fast_field_writers = FastFieldsWriter::from_schema(&schema); { - let fast_field_writer = fast_field_writers - .get_field_writer(field) - .expect("With a RAMDirectory, this should never fail."); + let fast_field_writer = fast_field_writers.get_field_writer(field).expect( + "With a RAMDirectory, this should never fail.", + ); for val in vals { fast_field_writer.add_val(val); } } - fast_field_writers.serialize(&mut serializer).unwrap(); + fast_field_writers.serialize(&mut serializer, HashMap::new()).unwrap(); serializer.close().unwrap(); } @@ -153,9 +159,9 @@ impl From> for U64FastFieldReader { let composite_file = CompositeFile::open(&source).expect("Failed to read the composite file"); - let field_source = composite_file - .open_read(field) - .expect("File component not found"); + let field_source = composite_file.open_read(field).expect( + "File component not found", + ); U64FastFieldReader::open(field_source) } } @@ -216,9 +222,7 @@ impl FastFieldReader for I64FastFieldReader { /// # Panics /// Panics if the data is corrupted. fn open(data: ReadOnlySource) -> I64FastFieldReader { - I64FastFieldReader { - underlying: U64FastFieldReader::open(data), - } + I64FastFieldReader { underlying: U64FastFieldReader::open(data) } } fn is_enabled(field_type: &FieldType) -> bool { diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index 04453733f..afce4f053 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -35,7 +35,7 @@ impl FastFieldSerializer { pub fn from_write(write: WritePtr) -> io::Result { // just making room for the pointer to header. let composite_write = CompositeWrite::wrap(write); - Ok(FastFieldSerializer { composite_write }) + Ok(FastFieldSerializer { composite_write: composite_write }) } /// Start serializing a new u64 fast field @@ -45,10 +45,21 @@ impl FastFieldSerializer { min_value: u64, max_value: u64, ) -> io::Result>> { - let field_write = self.composite_write.for_field(field); + self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) + } + + /// Start serializing a new u64 fast field + pub fn new_u64_fast_field_with_idx( + &mut self, + field: Field, + min_value: u64, + max_value: u64, + idx: usize) -> io::Result>> { + let field_write = self.composite_write.for_field_with_idx(field, idx); FastSingleFieldSerializer::open(field_write, min_value, max_value) } + /// Closes the serializer /// /// After this call the data must be persistently save on disk. @@ -75,9 +86,9 @@ impl<'a, W: Write> FastSingleFieldSerializer<'a, W> { let num_bits = compute_num_bits(amplitude); let bit_packer = BitPacker::new(num_bits as usize); Ok(FastSingleFieldSerializer { - write, - bit_packer, - min_value, + write: write, + bit_packer: bit_packer, + min_value: min_value, }) } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 0df867c27..780ea9bf2 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -1,4 +1,4 @@ -use schema::{Document, Field, Schema}; +use schema::{Schema, Field, Document, Cardinality}; use fastfield::FastFieldSerializer; use std::io; use schema::Value; @@ -6,76 +6,118 @@ use DocId; use schema::FieldType; use common; use common::VInt; +use std::collections::HashMap; +use postings::UnorderedTermId; +use super::multivalued::MultiValueIntFastFieldWriter; use common::BinarySerializable; /// The fastfieldswriter regroup all of the fast field writers. pub struct FastFieldsWriter { - field_writers: Vec, + single_value_writers: Vec, + multi_values_writers: Vec, } impl FastFieldsWriter { /// Create all `FastFieldWriter` required by the schema. pub fn from_schema(schema: &Schema) -> FastFieldsWriter { - let field_writers: Vec = schema - .fields() - .iter() - .enumerate() - .flat_map(|(field_id, field_entry)| { - let field = Field(field_id as u32); - match *field_entry.field_type() { - FieldType::I64(ref int_options) => { - if int_options.is_fast() { + let mut single_value_writers = Vec::new(); + let mut multi_values_writers = Vec::new(); + + for (field_id, field_entry) in schema.fields().iter().enumerate() { + let field = Field(field_id as u32); + let default_value = + if let FieldType::I64(_) = *field_entry.field_type() { + common::i64_to_u64(0i64) + } else { + 0u64 + }; + match *field_entry.field_type() { + FieldType::I64(ref int_options) | FieldType::U64(ref int_options) => { + match int_options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { let mut fast_field_writer = IntFastFieldWriter::new(field); - fast_field_writer.set_val_if_missing(common::i64_to_u64(0i64)); - Some(fast_field_writer) - } else { - None + fast_field_writer.set_val_if_missing(default_value); + single_value_writers.push(fast_field_writer); } - } - FieldType::U64(ref int_options) => { - if int_options.is_fast() { - Some(IntFastFieldWriter::new(field)) - } else { - None + Some(Cardinality::MultiValues) => { + let fast_field_writer = MultiValueIntFastFieldWriter::new(field); + multi_values_writers.push(fast_field_writer); } + None => {} } - _ => None, } - }) - .collect(); - FastFieldsWriter { field_writers } + FieldType::HierarchicalFacet => { + let fast_field_writer = MultiValueIntFastFieldWriter::new(field); + multi_values_writers.push(fast_field_writer); + } + _ => {}, + } + } + FastFieldsWriter { + single_value_writers: single_value_writers, + multi_values_writers: multi_values_writers, + } } - /// Returns a `FastFieldsWriter` - /// with a `IntFastFieldWriter` for each + /// Returns a `FastFieldsWriter with a `u64` `IntFastFieldWriter` for each /// of the field given in argument. - pub fn new(fields: Vec) -> FastFieldsWriter { + pub(crate) fn new(fields: Vec) -> FastFieldsWriter { FastFieldsWriter { - field_writers: fields.into_iter().map(IntFastFieldWriter::new).collect(), + single_value_writers: fields.into_iter().map(IntFastFieldWriter::new).collect(), + multi_values_writers: vec!(), } } /// Get the `FastFieldWriter` associated to a field. pub fn get_field_writer(&mut self, field: Field) -> Option<&mut IntFastFieldWriter> { // TODO optimize - self.field_writers + self.single_value_writers .iter_mut() - .find(|field_writer| field_writer.field == field) + .find(|field_writer| { + field_writer.field() == field + }) + } + + /// Returns the fast field multi-value writer for the given field. + /// + /// Returns None if the field does not exist, or is not + /// configured as a multivalued fastfield in the schema. + pub(crate) fn get_multivalue_writer(&mut self, field: Field) -> Option<&mut MultiValueIntFastFieldWriter> { + // TODO optimize + // TODO expose for users + self.multi_values_writers + .iter_mut() + .find(|multivalue_writer| { + multivalue_writer.field() == field + }) } /// Indexes all of the fastfields of a new document. pub fn add_document(&mut self, doc: &Document) { - for field_writer in &mut self.field_writers { + for field_writer in &mut self.single_value_writers { field_writer.add_document(doc); } + for field_writer in &mut self.multi_values_writers { + field_writer.next_doc(); + } } /// Serializes all of the `FastFieldWriter`s by pushing them in /// order to the fast field serializer. - pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { - for field_writer in &self.field_writers { + pub fn serialize(&self, + serializer: &mut FastFieldSerializer, + mapping: HashMap>) -> io::Result<()> { + for field_writer in &self.single_value_writers { field_writer.serialize(serializer)?; } + for field_writer in &self.multi_values_writers { + let field = field_writer.field(); + if let Some(mapping) = mapping.get(&field) { + field_writer.serialize(serializer, mapping)?; + } else { + panic!("Term ordinal mapping missing for {:?}", field); + } + } Ok(()) } @@ -84,7 +126,7 @@ impl FastFieldsWriter { /// /// The missing values will be filled with 0. pub fn fill_val_up_to(&mut self, doc: DocId) { - for field_writer in &mut self.field_writers { + for field_writer in &mut self.single_value_writers { field_writer.fill_val_up_to(doc); } } @@ -118,7 +160,7 @@ impl IntFastFieldWriter { /// Creates a new `IntFastFieldWriter` pub fn new(field: Field) -> IntFastFieldWriter { IntFastFieldWriter { - field, + field: field, vals: Vec::new(), val_count: 0, val_if_missing: 0u64, @@ -127,6 +169,11 @@ impl IntFastFieldWriter { } } + /// Returns the field that this writer is targetting. + pub fn field(&self) -> Field { + self.field + } + /// Sets the default value. /// /// This default value is recorded for documents if @@ -154,9 +201,9 @@ impl IntFastFieldWriter { /// associated to the document with the `DocId` n. /// (Well, `n-1` actually because of 0-indexing) pub fn add_val(&mut self, val: u64) { - VInt(val) - .serialize(&mut self.vals) - .expect("unable to serialize VInt to Vec"); + VInt(val).serialize(&mut self.vals).expect( + "unable to serialize VInt to Vec", + ); if val > self.val_max { self.val_max = val; @@ -168,6 +215,7 @@ impl IntFastFieldWriter { self.val_count += 1; } + /// Extract the value associated to the fast field for /// this document. /// @@ -180,11 +228,13 @@ impl IntFastFieldWriter { /// only the first one is taken in account. fn extract_val(&self, doc: &Document) -> u64 { match doc.get_first(self.field) { - Some(v) => match *v { - Value::U64(ref val) => *val, - Value::I64(ref val) => common::i64_to_u64(*val), - _ => panic!("Expected a u64field, got {:?} ", v), - }, + Some(v) => { + match *v { + Value::U64(ref val) => *val, + Value::I64(ref val) => common::i64_to_u64(*val), + _ => panic!("Expected a u64field, got {:?} ", v), + } + } None => self.val_if_missing, } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 35b9d4459..8e887f33c 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -260,7 +260,7 @@ fn index_documents( let segment_id = segment.id(); let mut segment_writer = SegmentWriter::for_segment(heap, table_size, segment.clone(), schema)?; for doc in document_iterator { - segment_writer.add_document(&doc, schema)?; + segment_writer.add_document(doc, schema)?; // There is two possible conditions to close the segment. // One is the memory arena dedicated to the segment is // getting full. diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 55e0e0b88..183784d02 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -339,8 +339,7 @@ impl IndexMerger { for doc_id in 0..reader.max_doc() { if !reader.is_deleted(doc_id) { let doc = store_reader.get(doc_id)?; - let field_values: Vec<&FieldValue> = doc.field_values().iter().collect(); - store_writer.store(&field_values)?; + store_writer.store(&doc)?; } } } else { @@ -378,6 +377,7 @@ mod tests { use collector::tests::TestCollector; use query::BooleanQuery; use schema::IndexRecordOption; + use schema::Cardinality; use futures::Future; #[test] @@ -391,7 +391,7 @@ mod tests { ) .set_stored(); let text_field = schema_builder.add_text_field("text", text_fieldtype); - let score_fieldtype = schema::IntOptions::default().set_fast(); + let score_fieldtype = schema::IntOptions::default().set_fast(Cardinality::SingleValue); let score_field = schema_builder.add_u64_field("score", score_fieldtype); let index = Index::create_in_ram(schema_builder.build()); @@ -526,7 +526,7 @@ mod tests { ) .set_stored(); let text_field = schema_builder.add_text_field("text", text_fieldtype); - let score_fieldtype = schema::IntOptions::default().set_fast(); + let score_fieldtype = schema::IntOptions::default().set_fast(Cardinality::SingleValue); let score_field = schema_builder.add_u64_field("score", score_fieldtype); let index = Index::create_in_ram(schema_builder.build()); let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 34e4fa4be..94f5fa4ce 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,6 +1,7 @@ use Result; use DocId; use std::io; +use std::str; use schema::Schema; use schema::Term; use core::Segment; @@ -10,11 +11,14 @@ use schema::Field; use schema::FieldValue; use schema::FieldType; use indexer::segment_serializer::SegmentSerializer; +use std::collections::HashMap; use datastruct::stacker::Heap; use indexer::index_writer::MARGIN_IN_BYTES; use super::operation::AddOperation; use postings::MultiFieldPostingsWriter; use tokenizer::BoxedTokenizer; +use tokenizer::FacetTokenizer; +use tokenizer::{Tokenizer, TokenStream}; use schema::Value; /// A `SegmentWriter` is in charge of creating segment index from a @@ -125,18 +129,53 @@ impl<'a> SegmentWriter<'a> { /// As a user, you should rather use `IndexWriter`'s add_document. pub fn add_document( &mut self, - add_operation: &AddOperation, + add_operation: AddOperation, schema: &Schema, ) -> io::Result<()> { let doc_id = self.max_doc; - let doc = &add_operation.document; + let mut doc = add_operation.document; self.doc_opstamps.push(add_operation.opstamp); + + self.fast_field_writers.add_document(&doc); + for (field, field_values) in doc.get_sorted_field_values() { let field_options = schema.get_field_entry(field); if !field_options.is_indexed() { continue; } match *field_options.field_type() { + FieldType::HierarchicalFacet => { + let facets: Vec<&[u8]> = field_values.iter() + .flat_map(|field_value| { + match field_value.value() { + &Value::Facet(ref facet) => Some(facet.encoded_bytes()), + _ => { + panic!("Expected hierarchical facet"); + } + } + }) + .collect(); + let mut term = unsafe {Term::with_capacity(100)}; + term.set_field(field); + for facet_bytes in facets { + let mut unordered_term_id_opt = None; + let fake_str = unsafe { str::from_utf8_unchecked(facet_bytes) }; + FacetTokenizer + .token_stream(&fake_str) + .process(&mut |ref token| { + term.set_text(&token.text); + let unordered_term_id = self.multifield_postings.subscribe(doc_id, &term); + unordered_term_id_opt = Some(unordered_term_id); + }); + + if let Some(unordered_term_id) = unordered_term_id_opt { + self.fast_field_writers + .get_multivalue_writer(field) + .expect("multified writer for facet missing") + .add_val(unordered_term_id); + } + } + } FieldType::Str(_) => { let num_tokens = if let Some(ref mut tokenizer) = self.tokenizers[field.0 as usize] { @@ -147,9 +186,13 @@ impl<'a> SegmentWriter<'a> { _ => None, }) .collect(); - let mut token_stream = tokenizer.token_stream_texts(&texts[..]); - self.multifield_postings - .index_text(doc_id, field, &mut token_stream) + if texts.is_empty() { + 0 + } else { + let mut token_stream = tokenizer.token_stream_texts(&texts[..]); + self.multifield_postings + .index_text(doc_id, field, &mut token_stream) + } } else { 0 }; @@ -184,13 +227,15 @@ impl<'a> SegmentWriter<'a> { } } self.fieldnorms_writer.fill_val_up_to(doc_id); - self.fast_field_writers.add_document(doc); + doc.filter_fields(|field| { + schema.get_field_entry(field).is_stored() + }); let stored_fieldvalues: Vec<&FieldValue> = doc.field_values() .iter() .filter(|field_value| schema.get_field_entry(field_value.field()).is_stored()) .collect(); let doc_writer = self.segment_serializer.get_store_writer(); - doc_writer.store(&stored_fieldvalues)?; + doc_writer.store(&doc)?; self.max_doc += 1; Ok(()) } @@ -223,9 +268,9 @@ fn write( fieldnorms_writer: &FastFieldsWriter, mut serializer: SegmentSerializer, ) -> Result<()> { - multifield_postings.serialize(serializer.get_postings_serializer())?; - fast_field_writers.serialize(serializer.get_fast_field_serializer())?; - fieldnorms_writer.serialize(serializer.get_fieldnorms_serializer())?; + let term_ord_map = multifield_postings.serialize(serializer.get_postings_serializer())?; + fast_field_writers.serialize(serializer.get_fast_field_serializer(), term_ord_map)?; + fieldnorms_writer.serialize(serializer.get_fieldnorms_serializer(), HashMap::new())?; serializer.close()?; Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 46f537067..506c3241d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,7 @@ extern crate stable_deref_trait; extern crate tempdir; extern crate time; extern crate uuid; +extern crate bincode; #[cfg(test)] extern crate env_logger; diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 4055f66b9..972d38aae 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -32,6 +32,8 @@ pub use self::segment_postings::{BlockSegmentPostings, SegmentPostings}; pub use self::intersection::IntersectionDocSet; pub use common::HasLen; +pub(crate) type UnorderedTermId = usize; + #[cfg(test)] mod tests { @@ -149,7 +151,7 @@ mod tests { opstamp: 0u64, document: doc, }; - segment_writer.add_document(&op, &schema).unwrap(); + segment_writer.add_document(op, &schema).unwrap(); } { let mut doc = Document::default(); @@ -158,7 +160,7 @@ mod tests { opstamp: 1u64, document: doc, }; - segment_writer.add_document(&op, &schema).unwrap(); + segment_writer.add_document(op, &schema).unwrap(); } for i in 2..1000 { let mut doc = Document::default(); @@ -169,7 +171,7 @@ mod tests { opstamp: 2u64, document: doc, }; - segment_writer.add_document(&op, &schema).unwrap(); + segment_writer.add_document(op, &schema).unwrap(); } segment_writer.finalize().unwrap(); } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 379432df5..b63e0d527 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,48 +1,58 @@ use DocId; use schema::Term; -use postings::{FieldSerializer, InvertedIndexSerializer}; +use postings::{InvertedIndexSerializer, FieldSerializer}; use std::io; +use std::collections::HashMap; use postings::Recorder; use Result; -use schema::{Field, Schema}; -use tokenizer::Token; +use schema::{Schema, Field}; use std::marker::PhantomData; use std::ops::DerefMut; -use datastruct::stacker::{HashMap, Heap}; -use postings::{NothingRecorder, TFAndPositionRecorder, TermFrequencyRecorder}; +use datastruct::stacker::{TermHashMap, Heap}; +use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; use schema::FieldEntry; use schema::FieldType; +use tokenizer::Token; use tokenizer::TokenStream; use schema::IndexRecordOption; +use postings::UnorderedTermId; + fn posting_from_field_entry<'a>( field_entry: &FieldEntry, heap: &'a Heap, ) -> Box { match *field_entry.field_type() { - FieldType::Str(ref text_options) => text_options + FieldType::Str(ref text_options) => { + text_options .get_indexing_options() - .map(|indexing_options| match indexing_options.index_option() { - IndexRecordOption::Basic => { - SpecializedPostingsWriter::::new_boxed(heap) - } - IndexRecordOption::WithFreqs => { - SpecializedPostingsWriter::::new_boxed(heap) - } - IndexRecordOption::WithFreqsAndPositions => { - SpecializedPostingsWriter::::new_boxed(heap) + .map(|indexing_options| { + match indexing_options.index_option() { + IndexRecordOption::Basic => { + SpecializedPostingsWriter::::new_boxed(heap) + } + IndexRecordOption::WithFreqs => { + SpecializedPostingsWriter::::new_boxed(heap) + } + IndexRecordOption::WithFreqsAndPositions => { + SpecializedPostingsWriter::::new_boxed(heap) + } } }) - .unwrap_or_else(|| SpecializedPostingsWriter::::new_boxed(heap)), - FieldType::U64(_) | FieldType::I64(_) => { - SpecializedPostingsWriter::::new_boxed(heap) + .unwrap_or_else(|| { + SpecializedPostingsWriter::::new_boxed(heap) + }) } + FieldType::U64(_) | + FieldType::I64(_) | + FieldType::HierarchicalFacet => SpecializedPostingsWriter::::new_boxed(heap), } } + pub struct MultiFieldPostingsWriter<'a> { heap: &'a Heap, - term_index: HashMap<'a>, + term_index: TermHashMap<'a>, per_field_postings_writers: Vec>, } @@ -50,7 +60,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { /// Create a new `MultiFieldPostingsWriter` given /// a schema and a heap. pub fn new(schema: &Schema, table_bits: usize, heap: &'a Heap) -> MultiFieldPostingsWriter<'a> { - let term_index = HashMap::new(table_bits, heap); + let term_index = TermHashMap::new(table_bits, heap); let per_field_postings_writers: Vec<_> = schema .fields() .iter() @@ -58,9 +68,9 @@ impl<'a> MultiFieldPostingsWriter<'a> { .collect(); MultiFieldPostingsWriter { - heap, - term_index, - per_field_postings_writers, + heap: heap, + term_index: term_index, + per_field_postings_writers: per_field_postings_writers, } } @@ -69,26 +79,29 @@ impl<'a> MultiFieldPostingsWriter<'a> { postings_writer.index_text(&mut self.term_index, doc, field, token_stream, self.heap) } - pub fn subscribe(&mut self, doc: DocId, term: &Term) { + pub fn subscribe(&mut self, doc: DocId, term: &Term) -> UnorderedTermId { let postings_writer = self.per_field_postings_writers[term.field().0 as usize].deref_mut(); - postings_writer.suscribe(&mut self.term_index, doc, 0u32, term, self.heap) + postings_writer.subscribe(&mut self.term_index, doc, 0u32, term, self.heap) } /// Serialize the inverted index. /// It pushes all term, one field at a time, towards the /// postings serializer. #[allow(needless_range_loop)] - pub fn serialize(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> { - let mut term_offsets: Vec<(&[u8], u32)> = self.term_index.iter().collect(); - term_offsets.sort_by_key(|&(k, _v)| k); + pub fn serialize(&self, serializer: &mut InvertedIndexSerializer) -> Result>> { + let mut term_offsets: Vec<(&[u8], u32, UnorderedTermId)> = self.term_index.iter().collect(); + term_offsets.sort_by_key(|&(k, _, _)| k); let mut offsets: Vec<(Field, usize)> = vec![]; let term_offsets_it = term_offsets .iter() .cloned() - .map(|(key, _)| Term::wrap(key).field()) + .map(|(key, _, _)| Term::wrap(key).field()) .enumerate(); + + let mut unordered_term_mappings: HashMap> = HashMap::new(); + let mut prev_field = Field(u32::max_value()); for (offset, field) in term_offsets_it { if field != prev_field { @@ -97,9 +110,22 @@ impl<'a> MultiFieldPostingsWriter<'a> { } } offsets.push((Field(0), term_offsets.len())); + for i in 0..(offsets.len() - 1) { let (field, start) = offsets[i]; let (_, stop) = offsets[i + 1]; + + // populating the unordered term ord -> ordered term ord mapping + // for the field. + let mut mapping = HashMap::new(); + for (term_ord, term_unord_id) in term_offsets[start..stop] + .iter() + .map(|&(_,_,bucket)| bucket) + .enumerate() { + mapping.insert(term_unord_id, term_ord); + } + unordered_term_mappings.insert(field, mapping); + let postings_writer = &self.per_field_postings_writers[field.0 as usize]; let mut field_serializer = serializer.new_field(field)?; postings_writer.serialize( @@ -109,7 +135,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { )?; field_serializer.close()?; } - Ok(()) + Ok(unordered_term_mappings) } /// Return true iff the term dictionary is saturated. @@ -118,6 +144,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { } } + /// The `PostingsWriter` is in charge of receiving documenting /// and building a `Segment` in anonymous memory. /// @@ -130,40 +157,37 @@ pub trait PostingsWriter { /// * term - the term /// * heap - heap used to store the postings informations as well as the terms /// in the hashmap. - fn suscribe( + fn subscribe( &mut self, - term_index: &mut HashMap, + term_index: &mut TermHashMap, doc: DocId, pos: u32, term: &Term, heap: &Heap, - ); + ) -> UnorderedTermId; /// Serializes the postings on disk. /// The actual serialization format is handled by the `PostingsSerializer`. - fn serialize( - &self, - term_addrs: &[(&[u8], u32)], - serializer: &mut FieldSerializer, - heap: &Heap, - ) -> io::Result<()>; + fn serialize(&self, + term_addrs: &[(&[u8], u32, UnorderedTermId)], + serializer: &mut FieldSerializer, + heap: &Heap) + -> io::Result<()>; - /// Tokenize a text and suscribe all of its token. - fn index_text( - &mut self, - term_index: &mut HashMap, - doc_id: DocId, - field: Field, - token_stream: &mut TokenStream, - heap: &Heap, - ) -> u32 { + /// Tokenize a text and subscribe all of its token. + fn index_text<'a>(&mut self, + term_index: &mut TermHashMap, + doc_id: DocId, + field: Field, + token_stream: &mut TokenStream, + heap: &Heap) + -> u32 { let mut term = unsafe { Term::with_capacity(100) }; term.set_field(field); let mut sink = |token: &Token| { term.set_text(token.text.as_str()); - self.suscribe(term_index, doc_id, token.position as u32, &term, heap); + self.subscribe(term_index, doc_id, token.position as u32, &term, heap); }; - token_stream.process(&mut sink) } } @@ -179,7 +203,7 @@ impl<'a, Rec: Recorder + 'static> SpecializedPostingsWriter<'a, Rec> { /// constructor pub fn new(heap: &'a Heap) -> SpecializedPostingsWriter<'a, Rec> { SpecializedPostingsWriter { - heap, + heap: heap, _recorder_type: PhantomData, } } @@ -191,16 +215,17 @@ impl<'a, Rec: Recorder + 'static> SpecializedPostingsWriter<'a, Rec> { } impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'a, Rec> { - fn suscribe( + + fn subscribe( &mut self, - term_index: &mut HashMap, + term_index: &mut TermHashMap, doc: DocId, position: u32, term: &Term, heap: &Heap, - ) { + ) -> UnorderedTermId { debug_assert!(term.as_slice().len() >= 4); - let recorder: &mut Rec = term_index.get_or_create(term); + let (term_ord, recorder): (usize, &mut Rec) = term_index.get_or_create(term); let current_doc = recorder.current_doc(); if current_doc != doc { if current_doc != u32::max_value() { @@ -209,17 +234,20 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' recorder.new_doc(doc, heap); } recorder.record_position(position, heap); + term_ord } + + fn serialize( &self, - term_addrs: &[(&[u8], u32)], + term_addrs: &[(&[u8], u32, UnorderedTermId)], serializer: &mut FieldSerializer, heap: &Heap, ) -> io::Result<()> { - for &(term_bytes, addr) in term_addrs { + for &(term_bytes, addr, _) in term_addrs { let recorder: &mut Rec = self.heap.get_mut_ref(addr); - serializer.new_term(term_bytes)?; + serializer.new_term(&term_bytes[4..])?; recorder.serialize(addr, serializer, heap)?; serializer.close_term()?; } diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index ca34711ef..654ba2ab2 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -23,6 +23,12 @@ pub struct TermInfo { pub positions_inner_offset: u8, } +impl TermInfo { + /// Size required to encode the `TermInfo`. + // TODO make this smaller when positions are unused for instance. + pub(crate) const SIZE_IN_BYTES: usize = 4 + 8 + 8 + 1; +} + impl BinarySerializable for TermInfo { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.doc_freq.serialize(writer)?; diff --git a/src/query/all_query.rs b/src/query/all_query.rs new file mode 100644 index 000000000..632693cef --- /dev/null +++ b/src/query/all_query.rs @@ -0,0 +1,74 @@ +use query::Query; +use query::Weight; +use query::Scorer; +use core::SegmentReader; +use Result; +use DocSet; +use Score; +use DocId; +use std::any::Any; +use core::Searcher; + + +/// Query that matches all of the documents. +/// +/// All of the document get the score 1f32. +#[derive(Debug)] +pub struct AllQuery; + +impl Query for AllQuery { + fn as_any(&self) -> &Any { + self + } + + fn weight(&self, _: &Searcher) -> Result> { + Ok(box AllWeight) + } +} + +/// Weight associated to the `AllQuery` query. +pub struct AllWeight; + +impl Weight for AllWeight { + fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result> { + Ok(box AllScorer { + started: false, + doc: 0u32, + max_doc: reader.max_doc() + }) + } +} + + +/// Scorer associated to the `AllQuery` query. +pub struct AllScorer { + started: bool, + doc: DocId, + max_doc: DocId, +} + +impl DocSet for AllScorer { + fn advance(&mut self) -> bool { + if self.started { + self.doc += 1u32; + } + else { + self.started = true; + } + self.doc < self.max_doc + } + + fn doc(&self) -> DocId { + self.doc + } + + fn size_hint(&self) -> usize { + self.max_doc as usize + } +} + +impl Scorer for AllScorer { + fn score(&self) -> Score { + 1f32 + } +} \ No newline at end of file diff --git a/src/query/mod.rs b/src/query/mod.rs index af31dfe57..7177871b5 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -11,6 +11,7 @@ mod occur_filter; mod term_query; mod query_parser; mod phrase_query; +mod all_query; pub use self::boolean_query::BooleanQuery; pub use self::occur_filter::OccurFilter; @@ -23,3 +24,4 @@ pub use self::scorer::EmptyScorer; pub use self::scorer::Scorer; pub use self::term_query::TermQuery; pub use self::weight::Weight; +pub use self::all_query::{AllQuery, AllWeight, AllScorer}; \ No newline at end of file diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index 57b741eb9..5c9394c0d 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -206,6 +206,10 @@ impl QueryParser { )) } } + FieldType::HierarchicalFacet => { + let term = Term::from_field_text(field, phrase); + Ok(Some(LogicalLiteral::Term(term))) + } } } diff --git a/src/schema/document.rs b/src/schema/document.rs index 025bd9084..972e47fe4 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -11,11 +11,19 @@ use itertools::Itertools; /// Documents are really just a list of couple `(field, value)`. /// In this list, one field may appear more than once. -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Document { field_values: Vec, } +impl From> for Document { + fn from(field_values: Vec) -> Self { + Document { + field_values + } + } +} + impl PartialEq for Document { fn eq(&self, other: &Document) -> bool { // super slow, but only here for tests @@ -27,6 +35,7 @@ impl PartialEq for Document { } } + impl Eq for Document {} impl Document { @@ -45,6 +54,21 @@ impl Document { self.field_values.is_empty() } + /// Retain only the field that are matching the + /// predicate given in argument. + pub fn filter_fieldsbool>(&mut self, predicate: P) { + self.field_values + .retain(|field_value| predicate(field_value.field())); + } + + /// Adding a facet to the document. + pub fn add_facet(&mut self, field: Field, path: F) + where Facet: From { + let facet = Facet::from(path); + let value = Value::Facet(facet); + self.add(FieldValue::new(field, value)); + } + /// Add a text field. pub fn add_text(&mut self, field: Field, text: &str) { let value = Value::Str(String::from(text)); @@ -104,13 +128,7 @@ impl Document { } } -impl From> for Document { - fn from(field_values: Vec) -> Document { - Document { - field_values: field_values, - } - } -} + #[cfg(test)] mod tests { diff --git a/src/schema/facet.rs b/src/schema/facet.rs new file mode 100644 index 000000000..a41df1b49 --- /dev/null +++ b/src/schema/facet.rs @@ -0,0 +1,211 @@ +use std::fmt::{self, Display, Debug, Formatter}; +use std::str; +use std::io::{self, Read, Write}; +use regex::Regex; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::borrow::Cow; +use common::BinarySerializable; + + +const SLASH_BYTE: u8 = '/' as u8; +const ESCAPE_BYTE: u8 = '\\' as u8; + +/// BYTE used as a level separation in the binary +/// representation of facets. +pub const FACET_SEP_BYTE: u8 = 0u8; + +/// A Facet represent a point in a given hierarchy. +/// +/// They are typically represented similarly to a filepath. +/// For instance, an e-commerce website could +/// have a `Facet` for `/electronics/tv_and_video/led_tv`. +/// +/// A document can be associated to any number of facets. +/// The hierarchy implicitely imply that a document +/// belonging to a facet also belongs to the ancestor of +/// its facet. In the example above, `/electronics/tv_and_video/` +/// and `/electronics`. +#[derive(Clone, Eq, Hash, PartialEq, Ord, PartialOrd)] +pub struct Facet(Vec); + +impl Facet { + + /// Returns a new instance of the "root facet" + /// Equivalent to `/`. + pub fn root() -> Facet { + Facet(vec![]) + } + + /// Returns true iff the facet is the root facet `/`. + pub fn is_root(&self) -> bool { + self.encoded_bytes().is_empty() + } + + /// Returns a binary representation of the facet. + /// + /// In this representation, `0u8` is used as a separator + /// and the string parts of the facet are unescaped. + /// (The first `/` is not encoded at all). + /// + /// This representation has the benefit of making it possible to + /// express "being a child of a given facet" as a range over + /// the term ordinals. + pub fn encoded_bytes(&self) -> &[u8] { + &self.0 + } + + /// Creates a `Facet` from its binary representation. + pub(crate) fn from_encoded(encoded_bytes: Vec) -> Facet { + Facet(encoded_bytes) + } + + /// Parse a text representation of a facet. + /// + /// It is conceptually, if one of the steps of this path + /// contains a `/` or a `\`, it should be escaped + /// using an anti-slash `/`. + pub fn from_text<'a, T>(path: &'a T) -> Facet + where T: ?Sized + AsRef { + From::from(path) + } + + /// Returns a `Facet` from an iterator over the different + /// steps of the facet path. + /// + /// The steps are expected to be unescaped. + pub fn from_path(path: Path) -> Facet + where + Path: IntoIterator, + Path::Item: ToString { + let mut facet_bytes: Vec = Vec::with_capacity(100); + let mut step_it = path.into_iter(); + if let Some(step) = step_it.next() { + facet_bytes.extend_from_slice(step.to_string().as_bytes()); + } + for step in step_it { + facet_bytes.push(FACET_SEP_BYTE); + facet_bytes.extend_from_slice(step.to_string().as_bytes()); + } + Facet(facet_bytes) + } + + /// Accessor for the inner buffer of the `Facet`. + pub(crate) fn inner_buffer_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + + +impl<'a, T: ?Sized + AsRef> From<&'a T> for Facet { + + fn from(path_asref: &'a T) -> Facet { + #[derive(Copy, Clone)] + enum State { + Escaped, + Idle, + } + let path: &str = path_asref.as_ref(); + let mut facet_encoded = Vec::new(); + let mut state = State::Idle; + let path_bytes = path.as_bytes(); + for &c in &path_bytes[1..] { + match (state, c) { + (State::Idle, ESCAPE_BYTE) => { + state = State::Escaped + } + (State::Idle, SLASH_BYTE) => { + facet_encoded.push(FACET_SEP_BYTE); + } + (State::Escaped, any_char) => { + state = State::Idle; + facet_encoded.push(any_char); + } + (State::Idle, other_char) => { + facet_encoded.push(other_char); + } + } + } + Facet(facet_encoded) + } +} + +impl BinarySerializable for Facet { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + as BinarySerializable>::serialize(&self.0, writer) + } + + fn deserialize(reader: &mut R) -> io::Result { + let bytes = as BinarySerializable>::deserialize(reader)?; + Ok(Facet(bytes)) + } +} + +impl Display for Facet { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + for step in self.0.split(|&b| b == FACET_SEP_BYTE) { + write!(f, "/")?; + let step_str = unsafe { str::from_utf8_unchecked(step) }; + write!(f, "{}", escape_slashes(step_str))?; + } + Ok(()) + } +} + +fn escape_slashes(s: &str) -> Cow { + lazy_static! { + static ref SLASH_PTN: Regex = Regex::new(r"[\\/]").unwrap(); + } + SLASH_PTN.replace_all(s, "\\/") +} + +impl Serialize for Facet { + fn serialize(&self, serializer: S) -> Result + where S: Serializer { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for Facet { + fn deserialize(deserializer: D) -> Result where + D: Deserializer<'de> { + <&'de str as Deserialize<'de>>::deserialize(deserializer) + .map(Facet::from) + } +} + +impl Debug for Facet { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "Facet({})", self)?; + Ok(()) + } +} + + +#[cfg(test)] +mod tests { + + use super::Facet; + + #[test] + fn test_facet_display() { + { + let v = ["first", "second", "third"]; + let facet = Facet::from_path(v.iter()); + assert_eq!(format!("{}", facet), "/first/second/third"); + } + { + let v = ["first", "sec/ond", "third"]; + let facet = Facet::from_path(v.iter()); + assert_eq!(format!("{}", facet), "/first/sec\\/ond/third"); + } + } + + + #[test] + fn test_facet_debug() { + let v = ["first", "second", "third"]; + let facet = Facet::from_path(v.iter()); + assert_eq!(format!("{:?}", facet), "Facet(/first/second/third)"); + } + +} \ No newline at end of file diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index b4ba5a1aa..5d5d49273 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -48,6 +48,14 @@ impl FieldEntry { } } + /// Creates a field entry for a facet. + pub fn new_facet(field_name: String) -> FieldEntry { + FieldEntry { + name: field_name, + field_type: FieldType::HierarchicalFacet, + } + } + /// Returns the name of the field pub fn name(&self) -> &str { &self.name @@ -63,6 +71,7 @@ impl FieldEntry { match self.field_type { FieldType::Str(ref options) => options.get_indexing_options().is_some(), FieldType::U64(ref options) | FieldType::I64(ref options) => options.is_indexed(), + FieldType::HierarchicalFacet => true } } @@ -79,6 +88,8 @@ impl FieldEntry { match self.field_type { FieldType::U64(ref options) | FieldType::I64(ref options) => options.is_stored(), FieldType::Str(ref options) => options.is_stored(), + FieldType::HierarchicalFacet => true, + // TODO make stored hierachical facet optional } } } @@ -104,6 +115,9 @@ impl Serialize for FieldEntry { s.serialize_field("type", "i64")?; s.serialize_field("options", options)?; } + FieldType::HierarchicalFacet => { + s.serialize_field("type", "hierarchical_facet")?; + } } s.end() @@ -154,6 +168,9 @@ impl<'de> Deserialize<'de> for FieldEntry { return Err(de::Error::duplicate_field("type")); } ty = Some(map.next_value()?); + if ty == Some("hierarchical_facet") { + field_type = Some(FieldType::HierarchicalFacet); + } } Field::Options => match ty { None => { diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index b7c489c7f..3a39490b3 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -3,6 +3,7 @@ use schema::{IntOptions, TextOptions}; use serde_json::Value as JsonValue; use schema::Value; use schema::IndexRecordOption; +use schema::Facet; /// Possible error that may occur while parsing a field value /// At this point the JSON is known to be valid. @@ -18,7 +19,7 @@ pub enum ValueParsingError { /// A `FieldType` describes the type (text, u64) of a field as well as /// how it should be handled by tantivy. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum FieldType { /// String field type configuration Str(TextOptions), @@ -26,6 +27,8 @@ pub enum FieldType { U64(IntOptions), /// Signed 64-bits integers 64 field type configuration I64(IntOptions), + /// Hierachical Facet + HierarchicalFacet, } impl FieldType { @@ -36,6 +39,7 @@ impl FieldType { FieldType::U64(ref int_options) | FieldType::I64(ref int_options) => { int_options.is_indexed() } + FieldType::HierarchicalFacet => true } } @@ -55,6 +59,7 @@ impl FieldType { None } } + FieldType::HierarchicalFacet => Some(IndexRecordOption::Basic) } } @@ -70,6 +75,9 @@ impl FieldType { FieldType::U64(_) | FieldType::I64(_) => Err(ValueParsingError::TypeError( format!("Expected an integer, got {:?}", json), )), + FieldType::HierarchicalFacet => { + Ok(Value::Facet(Facet::from(field_text))) + } }, JsonValue::Number(ref field_val_num) => match *self { FieldType::I64(_) => { @@ -88,7 +96,7 @@ impl FieldType { Err(ValueParsingError::OverflowError(msg)) } } - FieldType::Str(_) => { + FieldType::Str(_) | FieldType::HierarchicalFacet => { let msg = format!("Expected a string, got {:?}", json); Err(ValueParsingError::TypeError(msg)) } diff --git a/src/schema/int_options.rs b/src/schema/int_options.rs index c487ad200..f47d79319 100644 --- a/src/schema/int_options.rs +++ b/src/schema/int_options.rs @@ -1,10 +1,24 @@ use std::ops::BitOr; + +/// Express whether a field is single-value or multi-valued. +#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub enum Cardinality { + /// The document must have exactly one value associated to the document. + #[serde(rename = "single")] + SingleValue, + /// The document can have any number of values associated to the document. + /// This is more memory and CPU expensive than the SingleValue solution. + #[serde(rename = "multi")] + MultiValues +} + /// Define how an int field should be handled by tantivy. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct IntOptions { indexed: bool, - fast: bool, + #[serde(skip_serializing_if="Option::is_none")] + fast: Option, stored: bool, } @@ -21,7 +35,7 @@ impl IntOptions { /// Returns true iff the value is a fast field. pub fn is_fast(&self) -> bool { - self.fast + self.fast.is_some() } /// Set the u64 options as stored. @@ -42,35 +56,44 @@ impl IntOptions { self } - /// Set the u64 options as a fast field. + /// Set the u64 options as a single-valued fast field. /// /// Fast fields are designed for random access. /// Access time are similar to a random lookup in an array. /// If more than one value is associated to a fast field, only the last one is /// kept. - pub fn set_fast(mut self) -> IntOptions { - self.fast = true; + pub fn set_fast(mut self, cardinality: Cardinality) -> IntOptions { + self.fast = Some(cardinality); self } + + /// Returns the cardinality of the fastfield. + /// + /// If the field has not been declared as a fastfield, then + /// the method returns None. + pub fn get_fastfield_cardinality(&self) -> Option { + self.fast + } } impl Default for IntOptions { fn default() -> IntOptions { IntOptions { - fast: false, indexed: false, stored: false, + fast: None, } } } + /// Shortcut for a u64 fast field. /// /// Such a shortcut can be composed as follows `STORED | FAST | INT_INDEXED` pub const FAST: IntOptions = IntOptions { indexed: false, stored: false, - fast: true, + fast: Some(Cardinality::SingleValue), }; /// Shortcut for a u64 indexed field. @@ -79,7 +102,7 @@ pub const FAST: IntOptions = IntOptions { pub const INT_INDEXED: IntOptions = IntOptions { indexed: true, stored: false, - fast: false, + fast: None, }; /// Shortcut for a u64 stored field. @@ -88,9 +111,10 @@ pub const INT_INDEXED: IntOptions = IntOptions { pub const INT_STORED: IntOptions = IntOptions { indexed: false, stored: true, - fast: false, + fast: None, }; + impl BitOr for IntOptions { type Output = IntOptions; @@ -98,7 +122,7 @@ impl BitOr for IntOptions { let mut res = IntOptions::default(); res.indexed = self.indexed | other.indexed; res.stored = self.stored | other.stored; - res.fast = self.fast | other.fast; + res.fast = self.fast.or(other.fast); res } } diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 2b788aefc..6e4b0f51d 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -103,6 +103,7 @@ the field is required during scoring or collection for instance. mod schema; mod term; mod document; +mod facet; mod field_type; mod field_entry; @@ -120,6 +121,9 @@ pub use self::schema::{Schema, SchemaBuilder}; pub use self::value::Value; pub use self::schema::DocParsingError; +pub use self::facet::Facet; +pub use self::facet::FACET_SEP_BYTE; + pub use self::document::Document; pub use self::field::Field; pub use self::term::Term; @@ -139,6 +143,7 @@ pub use self::int_options::IntOptions; pub use self::int_options::FAST; pub use self::int_options::INT_INDEXED; pub use self::int_options::INT_STORED; +pub use self::int_options::Cardinality; use regex::Regex; diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 3021c6c28..057f2c598 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -89,6 +89,12 @@ impl SchemaBuilder { self.add_field(field_entry) } + /// Adds a facet field to the schema. + pub fn add_facet_field(&mut self, field_name: &str) -> Field { + let field_entry = FieldEntry::new_facet(field_name.to_string()); + self.add_field(field_entry) + } + /// Adds a field entry to the schema in build. fn add_field(&mut self, field_entry: FieldEntry) -> Field { let field = Field(self.fields.len() as u32); @@ -328,8 +334,8 @@ mod tests { #[test] pub fn test_schema_serialization() { let mut schema_builder = SchemaBuilder::default(); - let count_options = IntOptions::default().set_stored().set_fast(); - let popularity_options = IntOptions::default().set_stored().set_fast(); + let count_options = IntOptions::default().set_stored().set_fast(Cardinality::SingleValue); + let popularity_options = IntOptions::default().set_stored().set_fast(Cardinality::SingleValue); schema_builder.add_text_field("title", TEXT); schema_builder.add_text_field("author", STRING); schema_builder.add_u64_field("count", count_options); @@ -393,7 +399,7 @@ mod tests { #[test] pub fn test_document_to_json() { let mut schema_builder = SchemaBuilder::default(); - let count_options = IntOptions::default().set_stored().set_fast(); + let count_options = IntOptions::default().set_stored().set_fast(Cardinality::SingleValue); schema_builder.add_text_field("title", TEXT); schema_builder.add_text_field("author", STRING); schema_builder.add_u64_field("count", count_options); @@ -412,8 +418,8 @@ mod tests { #[test] pub fn test_parse_document() { let mut schema_builder = SchemaBuilder::default(); - let count_options = IntOptions::default().set_stored().set_fast(); - let popularity_options = IntOptions::default().set_stored().set_fast(); + let count_options = IntOptions::default().set_stored().set_fast(Cardinality::SingleValue); + let popularity_options = IntOptions::default().set_stored().set_fast(Cardinality::SingleValue); let title_field = schema_builder.add_text_field("title", TEXT); let author_field = schema_builder.add_text_field("author", STRING); let count_field = schema_builder.add_u64_field("count", count_options); diff --git a/src/schema/value.rs b/src/schema/value.rs index 145650498..eaf66f101 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -1,6 +1,7 @@ use std::fmt; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::de::Visitor; +use schema::Facet; /// Value represents the value of a any field. /// It is an enum over all over all of the possible field type. @@ -12,6 +13,8 @@ pub enum Value { U64(u64), /// Signed 64-bits Integer `i64` I64(i64), + /// Hierarchical Facet + Facet(Facet), } impl Serialize for Value { @@ -23,6 +26,7 @@ impl Serialize for Value { Value::Str(ref v) => serializer.serialize_str(v), Value::U64(u) => serializer.serialize_u64(u), Value::I64(u) => serializer.serialize_i64(u), + Value::Facet(ref facet) => facet.serialize(serializer) } } } @@ -121,14 +125,22 @@ impl<'a> From<&'a str> for Value { } } +impl<'a> From for Value { + fn from(facet: Facet) -> Value { + Value::Facet(facet) + } +} + mod binary_serialize { use common::BinarySerializable; use std::io::{self, Read, Write}; use super::Value; + use schema::Facet; const TEXT_CODE: u8 = 0; const U64_CODE: u8 = 1; const I64_CODE: u8 = 2; + const HIERARCHICAL_FACET_CODE: u8 = 3; impl BinarySerializable for Value { fn serialize(&self, writer: &mut W) -> io::Result<()> { @@ -145,6 +157,10 @@ mod binary_serialize { I64_CODE.serialize(writer)?; val.serialize(writer) } + Value::Facet(ref facet) => { + HIERARCHICAL_FACET_CODE.serialize(writer)?; + facet.serialize(writer) + } } } fn deserialize(reader: &mut R) -> io::Result { @@ -162,6 +178,9 @@ mod binary_serialize { let value = i64::deserialize(reader)?; Ok(Value::I64(value)) } + HIERARCHICAL_FACET_CODE => { + Ok(Value::Facet(Facet::deserialize(reader)?)) + } _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("No field type is associated with code {:?}", type_code), diff --git a/src/store/mod.rs b/src/store/mod.rs index 7b6e84617..15d4d51e1 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -47,6 +47,7 @@ mod tests { use schema::{Schema, SchemaBuilder}; use schema::TextOptions; use schema::FieldValue; + use schema::Document; use directory::{Directory, MmapDirectory, RAMDirectory, WritePtr}; fn write_lorem_ipsum_store(writer: WritePtr, num_docs: usize) -> Schema { @@ -78,8 +79,9 @@ mod tests { let field_value = FieldValue::new(field_title, From::from(title_text)); fields.push(field_value); } - let fields_refs: Vec<&FieldValue> = fields.iter().collect(); - store_writer.store(&fields_refs).unwrap(); + //let fields_refs: Vec<&FieldValue> = fields.iter().collect(); + let doc = Document::from(fields); + store_writer.store(&doc).unwrap(); } store_writer.close().unwrap(); } diff --git a/src/store/reader.rs b/src/store/reader.rs index 28feb87ec..0ffaa983c 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -8,6 +8,7 @@ use schema::FieldValue; use common::BinarySerializable; use std::mem::size_of; use std::io::{self, Read}; +use bincode; use datastruct::SkipList; use lz4; @@ -81,17 +82,13 @@ impl StoreReader { let current_block_mut = self.current_block.borrow_mut(); let mut cursor = ¤t_block_mut[..]; for _ in first_doc_id..doc_id { - let block_length = u32::deserialize(&mut cursor)?; - cursor = &cursor[block_length as usize..]; + let doc_length = u32::deserialize(&mut cursor)?; + cursor = &cursor[doc_length as usize..]; } - u32::deserialize(&mut cursor)?; - let num_fields = u32::deserialize(&mut cursor)?; - let mut field_values = Vec::new(); - for _ in 0..num_fields { - let field_value = FieldValue::deserialize(&mut cursor)?; - field_values.push(field_value); - } - Ok(Document::from(field_values)) + let doc_length = u32::deserialize(&mut cursor)? as usize; + let document: Document = bincode::deserialize(&cursor[..doc_length]) + .expect("The docstore is corrupted. Failed to fetch doc"); + Ok(document) } } diff --git a/src/store/writer.rs b/src/store/writer.rs index 7d0482272..e4a33b0b3 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -7,6 +7,8 @@ use super::StoreReader; use lz4; use datastruct::SkipListBuilder; use common::CountingWriter; +use bincode; +use schema::Document; const BLOCK_SIZE: usize = 16_384; @@ -46,13 +48,11 @@ impl StoreWriter { /// The document id is implicitely the number of times /// this method has been called. /// - pub fn store<'a>(&mut self, field_values: &[&'a FieldValue]) -> io::Result<()> { + pub fn store<'a>(&mut self, stored_document: &Document) -> io::Result<()> { self.intermediary_buffer.clear(); - (field_values.len() as u32).serialize(&mut self.intermediary_buffer)?; - for &field_value in field_values { - field_value.serialize(&mut self.intermediary_buffer)?; - } - (self.intermediary_buffer.len() as u32).serialize(&mut self.current_block)?; + bincode::serialize_into(&mut self.intermediary_buffer, stored_document, bincode::Infinite); + let doc_num_bytes = self.intermediary_buffer.len() as u32; + ::serialize(&doc_num_bytes, &mut self.current_block)?; self.current_block.write_all(&self.intermediary_buffer[..])?; self.doc += 1; if self.current_block.len() > BLOCK_SIZE { diff --git a/src/termdict/fstdict/streamer.rs b/src/termdict/fstdict/streamer.rs index 010602a36..9af6d42e2 100644 --- a/src/termdict/fstdict/streamer.rs +++ b/src/termdict/fstdict/streamer.rs @@ -1,8 +1,8 @@ use fst::{IntoStreamer, Streamer}; -use fst::map::{Stream, StreamBuilder}; +use fst::map::{StreamBuilder, Stream}; use postings::TermInfo; use super::TermDictionaryImpl; -use termdict::{TermStreamer, TermStreamerBuilder}; +use termdict::{TermOrdinal, TermDictionary, TermStreamerBuilder, TermStreamer}; /// See [`TermStreamerBuilder`](./trait.TermStreamerBuilder.html) pub struct TermStreamerBuilderImpl<'a> { @@ -13,8 +13,8 @@ pub struct TermStreamerBuilderImpl<'a> { impl<'a> TermStreamerBuilderImpl<'a> { pub(crate) fn new(fst_map: &'a TermDictionaryImpl, stream_builder: StreamBuilder<'a>) -> Self { TermStreamerBuilderImpl { - fst_map, - stream_builder, + fst_map: fst_map, + stream_builder: stream_builder, } } } @@ -46,37 +46,40 @@ impl<'a> TermStreamerBuilder for TermStreamerBuilderImpl<'a> { TermStreamerImpl { fst_map: self.fst_map, stream: self.stream_builder.into_stream(), - offset: 0u64, + term_ord: 0u64, current_key: Vec::with_capacity(100), current_value: TermInfo::default(), } } } + /// See [`TermStreamer`](./trait.TermStreamer.html) pub struct TermStreamerImpl<'a> { fst_map: &'a TermDictionaryImpl, stream: Stream<'a>, - offset: u64, + term_ord: TermOrdinal, current_key: Vec, current_value: TermInfo, } impl<'a> TermStreamer for TermStreamerImpl<'a> { fn advance(&mut self) -> bool { - if let Some((term, offset)) = self.stream.next() { + if let Some((term, term_ord)) = self.stream.next() { self.current_key.clear(); self.current_key.extend_from_slice(term); - self.offset = offset; - self.current_value = self.fst_map - .read_value(self.offset) - .expect("Fst data is corrupted. Failed to deserialize a value."); + self.term_ord = term_ord; + self.current_value = self.fst_map.term_info_from_ord(term_ord); true } else { false } } + fn term_ord(&self) -> TermOrdinal { + self.term_ord + } + fn key(&self) -> &[u8] { &self.current_key } @@ -85,3 +88,4 @@ impl<'a> TermStreamer for TermStreamerImpl<'a> { &self.current_value } } + diff --git a/src/termdict/fstdict/termdict.rs b/src/termdict/fstdict/termdict.rs index ac9121a63..9ae69d914 100644 --- a/src/termdict/fstdict/termdict.rs +++ b/src/termdict/fstdict/termdict.rs @@ -5,8 +5,8 @@ use directory::ReadOnlySource; use common::BinarySerializable; use schema::FieldType; use postings::TermInfo; -use termdict::{TermDictionary, TermDictionaryBuilder}; -use super::{TermStreamerBuilderImpl, TermStreamerImpl}; +use termdict::{TermDictionary, TermDictionaryBuilder, TermOrdinal}; +use super::{TermStreamerImpl, TermStreamerBuilderImpl}; fn convert_fst_error(e: fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) @@ -16,6 +16,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error { pub struct TermDictionaryBuilderImpl { fst_builder: fst::MapBuilder, data: Vec, + term_ord: u64, } impl TermDictionaryBuilderImpl @@ -31,8 +32,9 @@ where /// Prefer using `.insert(key, value)` pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { self.fst_builder - .insert(key, self.data.len() as u64) + .insert(key, self.term_ord) .map_err(convert_fst_error)?; + self.term_ord += 1; Ok(()) } @@ -52,17 +54,16 @@ where fn new(w: W, _field_type: FieldType) -> io::Result { let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; Ok(TermDictionaryBuilderImpl { - fst_builder, + fst_builder: fst_builder, data: Vec::new(), + term_ord: 0, }) } fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { let key = key_ref.as_ref(); - self.fst_builder - .insert(key, self.data.len() as u64) - .map_err(convert_fst_error)?; - value.serialize(&mut self.data)?; + self.insert_key(key.as_ref())?; + self.insert_value(value)?; Ok(()) } @@ -94,14 +95,6 @@ pub struct TermDictionaryImpl { values_mmap: ReadOnlySource, } -impl TermDictionaryImpl { - /// Deserialize and returns the value at address `offset` - pub(crate) fn read_value(&self, offset: u64) -> io::Result { - let buffer = self.values_mmap.as_slice(); - let mut cursor = &buffer[(offset as usize)..]; - TermInfo::deserialize(&mut cursor) - } -} impl<'a> TermDictionary<'a> for TermDictionaryImpl { type Streamer = TermStreamerImpl<'a>; @@ -112,23 +105,60 @@ impl<'a> TermDictionary<'a> for TermDictionaryImpl { let total_len = source.len(); let length_offset = total_len - 4; let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..]; - let footer_size = u32::deserialize(&mut split_len_buffer) - .expect("Deserializing 4 bytes should always work") as usize; + let footer_size = u32::deserialize(&mut split_len_buffer).expect( + "Deserializing 4 bytes should always work", + ) as usize; let split_len = length_offset - footer_size; let fst_source = source.slice(0, split_len); let values_source = source.slice(split_len, length_offset); let fst_index = open_fst_index(fst_source); TermDictionaryImpl { - fst_index, + fst_index: fst_index, values_mmap: values_source, } } + fn num_terms(&self) -> usize { + self.values_mmap.len() / TermInfo::SIZE_IN_BYTES + } + + fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec) -> bool { + bytes.clear(); + let fst = self.fst_index.as_fst(); + let mut node = fst.root(); + while ord != 0 || !node.is_final() { + if let Some(transition) = node.transitions() + .take_while(|transition| { + transition.out.value() <= ord + }) + .last() { + ord -= transition.out.value(); + bytes.push(transition.inp); + let new_node_addr = transition.addr; + node = fst.node(new_node_addr); + } + else { + return false; + } + } + return true; + } + + fn term_ord>(&self, key: K) -> Option { + self.fst_index.get(key) + } + + fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo { + let buffer = self.values_mmap.as_slice(); + let offset = term_ord as usize * TermInfo::SIZE_IN_BYTES; + let mut cursor = &buffer[offset..]; + TermInfo::deserialize(&mut cursor) + .expect("The fst is corrupted. Failed to deserialize a value.") + } + fn get>(&self, key: K) -> Option { - self.fst_index.get(key).map(|offset| { - self.read_value(offset) - .expect("The fst is corrupted. Failed to deserialize a value.") - }) + self.term_ord(key) + .map(|term_ord| self.term_info_from_ord(term_ord)) } fn range(&self) -> TermStreamerBuilderImpl { diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 0a9d1dc11..856721681 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -5,12 +5,14 @@ that serves as an address in their respective posting list. The term dictionary API makes it possible to iterate through a range of keys in a sorted manner. +``` + # Implementations There is currently two implementations of the term dictionary. -## Default implementation : *fstdict* +## Default implementation : `fstdict` The default one relies heavily on the `fst` crate. It associate each terms `&[u8]` representation to a `u64` @@ -18,7 +20,7 @@ that is in fact an address in a buffer. The value is then accessible via deserializing the value at this address. -## Stream implementation : *streamdict* +## Stream implementation : `streamdict` The `fstdict` is a tiny bit slow when streaming all of the terms. @@ -46,27 +48,34 @@ followed by a streaming through at most `1024` elements in the term `stream`. */ -use schema::{Field, FieldType, Term}; +use schema::{Term, Field, FieldType}; use directory::ReadOnlySource; use postings::TermInfo; + +/// Position of the term in the sorted list of terms. +pub type TermOrdinal = u64; + + pub use self::merger::TermMerger; #[cfg(not(feature = "streamdict"))] mod fstdict; #[cfg(not(feature = "streamdict"))] -pub use self::fstdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl, - TermStreamerImpl}; +pub use self::fstdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, + TermStreamerBuilderImpl}; + #[cfg(feature = "streamdict")] mod streamdict; #[cfg(feature = "streamdict")] -pub use self::streamdict::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerBuilderImpl, - TermStreamerImpl}; +pub use self::streamdict::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl, + TermStreamerBuilderImpl}; mod merger; use std::io; + /// Dictionary associating sorted `&[u8]` to values pub trait TermDictionary<'a> where @@ -81,6 +90,27 @@ where /// Opens a `TermDictionary` given a data source. fn from_source(source: ReadOnlySource) -> Self; + /// Returns the ordinal associated to a given term. + fn term_ord>(&self, term: K) -> Option; + + /// Returns the term associated to a given term ordinal. + /// + /// Term ordinals are defined as the position of the term in + /// the sorted list of terms. + /// + /// Returns true iff the term has been found. + /// + /// Regardless of whether the term is found or not, + /// the buffer may be modified. + fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> bool; + + /// Returns the number of terms in the dictionary. + fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo; + + /// Returns the number of terms in the dictionary. + /// Term ordinals range from 0 to `num_terms() - 1`. + fn num_terms(&self) -> usize; + /// Lookups the value corresponding to the key. fn get>(&self, target_key: K) -> Option; @@ -124,6 +154,7 @@ where fn finish(self) -> io::Result; } + /// `TermStreamer` acts as a cursor over a range of terms of a segment. /// Terms are guaranteed to be sorted. pub trait TermStreamer: Sized { @@ -133,6 +164,7 @@ pub trait TermStreamer: Sized { fn advance(&mut self) -> bool; /// Accesses the current key. + /// /// `.key()` should return the key that was returned /// by the `.next()` method. /// @@ -143,6 +175,12 @@ pub trait TermStreamer: Sized { /// Before any call to `.next()`, `.key()` returns an empty array. fn key(&self) -> &[u8]; + /// Returns the `TermOrdinal` of the given term. + /// + /// May panic if the called as `.advance()` as never + /// been called before. + fn term_ord(&self) -> TermOrdinal; + /// Accesses the current value. /// /// Calling `.value()` after the end of the stream will return the @@ -155,15 +193,16 @@ pub trait TermStreamer: Sized { fn value(&self) -> &TermInfo; /// Return the next `(key, value)` pair. - fn next(&mut self) -> Option<(Term<&[u8]>, &TermInfo)> { + fn next(&mut self) -> Option<(&[u8], &TermInfo)> { if self.advance() { - Some((Term::wrap(self.key()), self.value())) + Some((self.key(), self.value())) } else { None } } } + /// `TermStreamerBuilder` is an helper object used to define /// a range of terms that should be streamed. pub trait TermStreamerBuilder { @@ -187,12 +226,13 @@ pub trait TermStreamerBuilder { fn into_stream(self) -> Self::Streamer; } + #[cfg(test)] mod tests { - use super::{TermDictionaryBuilderImpl, TermDictionaryImpl, TermStreamerImpl}; - use directory::{Directory, RAMDirectory, ReadOnlySource}; + use super::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl}; + use directory::{RAMDirectory, Directory, ReadOnlySource}; use std::path::PathBuf; - use schema::{Document, FieldType, SchemaBuilder, Term, TEXT}; + use schema::{FieldType, SchemaBuilder, Document, TEXT}; use core::Index; use std::str; use termdict::TermStreamer; @@ -203,15 +243,52 @@ mod tests { const BLOCK_SIZE: usize = 1_500; + fn make_term_info(val: u64) -> TermInfo { TermInfo { doc_freq: val as u32, - positions_offset: val * 2u64, + positions_offset: val * 2u64, postings_offset: val * 3u64, positions_inner_offset: 5u8, } } + + #[test] + fn test_term_ordinals() { + const COUNTRIES: [&'static str; 7] = [ + "San Marino", + "Serbia", + "Slovakia", + "Slovenia", + "Spain", + "Sweden", + "Switzerland" + ]; + let mut directory = RAMDirectory::create(); + let path = PathBuf::from("TermDictionary"); + { + let write = directory.open_write(&path).unwrap(); + let field_type = FieldType::Str(TEXT); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(write, field_type) + .unwrap(); + for term in COUNTRIES.iter() { + term_dictionary_builder + .insert(term.as_bytes(), &make_term_info(0u64)) + .unwrap(); + } + term_dictionary_builder.finish().unwrap(); + } + let source = directory.open_read(&path).unwrap(); + let term_dict: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + for (term_ord, term) in COUNTRIES.iter().enumerate() { + assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64); + let mut bytes = vec!(); + assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)); + assert_eq!(bytes, term.as_bytes()); + } + } + #[test] fn test_term_dictionary_simple() { let mut directory = RAMDirectory::create(); @@ -219,8 +296,8 @@ mod tests { { let write = directory.open_write(&path).unwrap(); let field_type = FieldType::Str(TEXT); - let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(write, field_type).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(write, field_type) + .unwrap(); term_dictionary_builder .insert("abc".as_bytes(), &make_term_info(34u64)) .unwrap(); @@ -246,7 +323,7 @@ mod tests { { { let (k, v) = stream.next().unwrap(); - assert_eq!(k.as_slice(), "abcd".as_bytes()); + assert_eq!(k, "abcd".as_bytes()); assert_eq!(v.doc_freq, 346u32); } assert_eq!(stream.key(), "abcd".as_bytes()); @@ -294,12 +371,13 @@ mod tests { let mut term_it = field_searcher.terms(); let mut term_string = String::new(); while term_it.advance() { - let term = Term::from_bytes(term_it.key()); - term_string.push_str(term.text()); + //let term = Term::from_bytes(term_it.key()); + term_string.push_str(unsafe { str::from_utf8_unchecked(term_it.key()) }); } assert_eq!(&*term_string, "abcdef"); } + #[test] fn test_term_dictionary_stream() { let ids: Vec<_> = (0u32..10_000u32) @@ -307,8 +385,8 @@ mod tests { .collect(); let field_type = FieldType::Str(TEXT); let buffer: Vec = { - let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type) + .unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder .insert(id.as_bytes(), &make_term_info(*i as u64)) @@ -333,12 +411,13 @@ mod tests { term_dictionary.get(key.as_bytes()); } + #[test] fn test_stream_high_range_prefix_suffix() { let field_type = FieldType::Str(TEXT); let buffer: Vec = { - let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type) + .unwrap(); // term requires more than 16bits term_dictionary_builder .insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1)) @@ -372,8 +451,8 @@ mod tests { .collect(); let field_type = FieldType::Str(TEXT); let buffer: Vec = { - let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type) + .unwrap(); for &(ref id, ref i) in &ids { term_dictionary_builder .insert(id.as_bytes(), &make_term_info(*i as u64)) @@ -437,12 +516,13 @@ mod tests { } } + #[test] fn test_stream_range_boundaries() { let field_type = FieldType::Str(TEXT); let buffer: Vec = { - let mut term_dictionary_builder = - TermDictionaryBuilderImpl::new(vec![], field_type).unwrap(); + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type) + .unwrap(); for i in 0u8..10u8 { let number_arr = [i; 1]; term_dictionary_builder diff --git a/src/tokenizer/facet_tokenizer.rs b/src/tokenizer/facet_tokenizer.rs new file mode 100644 index 000000000..f20cbc1c0 --- /dev/null +++ b/src/tokenizer/facet_tokenizer.rs @@ -0,0 +1,89 @@ +use super::{Token, Tokenizer, TokenStream}; +use std::str; +use schema::FACET_SEP_BYTE; + + +/// The `FacetTokenizer` process a `Facet` binary representation +/// and emits a token for all of its parent. +/// +/// For instance, `/america/north_america/canada` +/// will emit the three following tokens +/// - `/america/north_america/canada` +/// - `/america/north_america` +/// - `/america` +#[derive(Clone)] +pub struct FacetTokenizer; + +pub struct FacetTokenStream<'a> { + text: &'a str, + pos: usize, + token: Token, +} + +impl<'a> Tokenizer<'a> for FacetTokenizer { + type TokenStreamImpl = FacetTokenStream<'a>; + + fn token_stream(&self, text: &'a str) -> Self::TokenStreamImpl { + FacetTokenStream { + text: text, + pos: 0, + token: Token::default(), + } + } +} + + +impl<'a> TokenStream for FacetTokenStream<'a> { + fn advance(&mut self) -> bool { + let bytes: &[u8] = self.text.as_bytes(); + if self.pos == bytes.len() { + false + } else { + let next_sep_pos = bytes[self.pos + 1..] + .iter() + .cloned() + .position(|b| b == FACET_SEP_BYTE) + .map(|pos| pos + self.pos + 1) + .unwrap_or(bytes.len()); + let facet_prefix = unsafe { str::from_utf8_unchecked(&bytes[self.pos..next_sep_pos]) }; + self.pos = next_sep_pos; + self.token.text.push_str(facet_prefix); + true + } + } + + fn token(&self) -> &Token { + &self.token + } + + fn token_mut(&mut self) -> &mut Token { + &mut self.token + } +} + +#[cfg(test)] +mod tests { + + use tokenizer::{TokenStream, Token, Tokenizer}; + use super::FacetTokenizer; + use schema::Facet; + + #[test] + fn test_facet_tokenizer() { + let facet = Facet::from_path(vec!["top", "a", "b"]); + let mut tokens = vec![]; + { + let mut add_token = |token: &Token| { + let facet = Facet::from_encoded(token.text.as_bytes().to_owned()); + tokens.push(format!("{}", facet)); + }; + FacetTokenizer + .token_stream(unsafe { ::std::str::from_utf8_unchecked(facet.encoded_bytes()) }) + .process(&mut add_token); + } + assert_eq!(tokens.len(), 3); + assert_eq!(tokens[0], "/top"); + assert_eq!(tokens[1], "/top/a"); + assert_eq!(tokens[2], "/top/a/b"); + } +} \ No newline at end of file diff --git a/src/tokenizer/mod.rs b/src/tokenizer/mod.rs index 7557a0b91..f885df140 100644 --- a/src/tokenizer/mod.rs +++ b/src/tokenizer/mod.rs @@ -133,6 +133,7 @@ mod simple_tokenizer; mod lower_caser; mod remove_long; mod stemmer; +mod facet_tokenizer; mod tokenizer_manager; mod japanese_tokenizer; mod token_stream_chain; @@ -150,6 +151,7 @@ pub use self::japanese_tokenizer::JapaneseTokenizer; pub use self::remove_long::RemoveLongFilter; pub use self::lower_caser::LowerCaser; pub use self::stemmer::Stemmer; +pub use self::facet_tokenizer::FacetTokenizer; #[cfg(test)] mod test {