diff --git a/columnar/src/lib.rs b/columnar/src/lib.rs index bdf28082a..20cd981bb 100644 --- a/columnar/src/lib.rs +++ b/columnar/src/lib.rs @@ -25,12 +25,14 @@ pub use columnar::{ merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType, MergeDocOrder, }; +use sstable::VoidSSTable; pub use value::{NumericalType, NumericalValue}; pub use self::dynamic_column::{DynamicColumn, DynamicColumnHandle}; pub type RowId = u32; pub use sstable::Dictionary; +pub type Streamer<'a> = sstable::Streamer<'a, VoidSSTable>; #[derive(Clone, Copy, PartialOrd, PartialEq, Default, Debug)] pub struct DateTime { diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index a3f10897c..859594af7 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -1,12 +1,11 @@ use std::cmp::Ordering; use std::collections::{btree_map, BTreeMap, BTreeSet, BinaryHeap}; -use std::iter::Peekable; use std::ops::Bound; -use std::{u64, usize}; +use std::{io, u64, usize}; use crate::collector::{Collector, SegmentCollector}; use crate::fastfield::FacetReader; -use crate::schema::{Facet, Field}; +use crate::schema::Facet; use crate::{DocId, Score, SegmentOrdinal, SegmentReader}; struct Hit<'a> { @@ -119,7 +118,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize { /// let searcher = reader.searcher(); /// /// { -/// let mut facet_collector = FacetCollector::for_field(facet); +/// let mut facet_collector = FacetCollector::for_field("facet"); /// facet_collector.add_facet("/lang"); /// facet_collector.add_facet("/category"); /// let facet_counts = searcher.search(&AllQuery, &facet_collector)?; @@ -135,7 +134,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize { /// } /// /// { -/// let mut facet_collector = FacetCollector::for_field(facet); +/// let mut facet_collector = FacetCollector::for_field("facet"); /// facet_collector.add_facet("/category/fiction"); /// let facet_counts = searcher.search(&AllQuery, &facet_collector)?; /// @@ -173,41 +172,12 @@ pub struct FacetCollector { pub struct FacetSegmentCollector { reader: FacetReader, - // facet_ord -> collapse facet_id - collapse_mapping: Vec, // collapse facet_id -> count counts: Vec, - // collapse facet_id -> facet_ord - collapse_facet_ords: Vec, -} - -#[derive(Debug)] -enum SkipResult { - Found, - NotFound, -} - -fn skip<'a, I: Iterator>( - target: &[u8], - collapse_it: &mut Peekable, -) -> SkipResult { - loop { - match collapse_it.peek() { - Some(facet_bytes) => match facet_bytes.encoded_str().as_bytes().cmp(target) { - Ordering::Less => {} - Ordering::Greater => { - return SkipResult::NotFound; - } - Ordering::Equal => { - return SkipResult::Found; - } - }, - None => { - return SkipResult::NotFound; - } - } - collapse_it.next(); - } + // facet_ord -> compressed collapse facet_id + compressed_collapse_mapping: Vec, + // compressed collapse facet_id -> facet_ord + unique_facet_ords: Vec<(u64, usize)>, } impl FacetCollector { @@ -249,6 +219,29 @@ impl FacetCollector { } } +fn compress_mapping(mapping: &[(u64, usize)]) -> (Vec, Vec<(u64, usize)>) { + // facet_ord -> collapse facet_id + let mut compressed_collapse_mapping: Vec = Vec::with_capacity(mapping.len()); + // collapse facet_id -> facet_ord + let mut unique_facet_ords: Vec<(u64, usize)> = Vec::new(); + if mapping.is_empty() { + return (Vec::new(), Vec::new()); + } + compressed_collapse_mapping.push(0); + unique_facet_ords.push(mapping[0]); + let mut last_facet_ord = mapping[0]; + let mut last_facet_id = 0; + for &facet_ord in &mapping[1..] { + if facet_ord != last_facet_ord { + last_facet_id += 1; + last_facet_ord = facet_ord; + unique_facet_ords.push(facet_ord); + } + compressed_collapse_mapping.push(last_facet_id); + } + (compressed_collapse_mapping, unique_facet_ords) +} + impl Collector for FacetCollector { type Fruit = FacetCounts; @@ -260,57 +253,16 @@ impl Collector for FacetCollector { reader: &SegmentReader, ) -> crate::Result { let facet_reader = reader.facet_reader(&self.field_name)?; - - let mut collapse_mapping = Vec::new(); - let mut counts = Vec::new(); - let mut collapse_facet_ords = Vec::new(); - - let mut collapse_facet_it = self.facets.iter().peekable(); - collapse_facet_ords.push(0); - { - let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?; - if facet_streamer.advance() { - 'outer: loop { - // at the beginning of this loop, facet_streamer - // is positioned on a term that has not been processed yet. - let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it); - match skip_result { - SkipResult::Found => { - // we reach a facet we decided to collapse. - let collapse_depth = facet_depth(facet_streamer.key()); - let mut collapsed_id = 0; - collapse_mapping.push(0); - while facet_streamer.advance() { - let depth = facet_depth(facet_streamer.key()); - if depth <= collapse_depth { - continue 'outer; - } - if depth == collapse_depth + 1 { - collapsed_id = collapse_facet_ords.len(); - collapse_facet_ords.push(facet_streamer.term_ord()); - } - collapse_mapping.push(collapsed_id); - } - break; - } - SkipResult::NotFound => { - collapse_mapping.push(0); - if !facet_streamer.advance() { - break; - } - } - } - } - } - } - - counts.resize(collapse_facet_ords.len(), 0); - + let facet_dict = facet_reader.facet_dict(); + let collapse_mapping: Vec<(u64, usize)> = + compute_collapse_mapping(facet_dict, &self.facets)?; + let (compressed_collapse_mapping, unique_facet_ords) = compress_mapping(&collapse_mapping); + let counts = vec![0u64; unique_facet_ords.len()]; Ok(FacetSegmentCollector { reader: facet_reader, - collapse_mapping, + compressed_collapse_mapping, counts, - collapse_facet_ords, + unique_facet_ords, }) } @@ -329,13 +281,78 @@ impl Collector for FacetCollector { } } +fn is_child_facet(parent_facet: &[u8], possible_child_facet: &[u8]) -> bool { + if !possible_child_facet.starts_with(parent_facet) { + return false; + } + possible_child_facet.get(parent_facet.len()).copied() == Some(0u8) +} + +fn compute_collapse_mapping_one( + facet_terms: &mut columnar::Streamer, + facet_bytes: &[u8], + collapsed: &mut [(u64, usize)], +) -> io::Result { + let mut facet_child: Vec = Vec::new(); + let mut term_ord = 0; + let offset = facet_bytes.len() + 1; + let depth = facet_depth(facet_bytes); + loop { + match facet_terms.key().cmp(facet_bytes) { + Ordering::Less | Ordering::Equal => {} + Ordering::Greater => { + if !is_child_facet(facet_bytes, facet_terms.key()) { + return Ok(true); + } + let suffix = &facet_terms.key()[offset..]; + if facet_child.is_empty() || !is_child_facet(&facet_child, suffix) { + facet_child.clear(); + term_ord = facet_terms.term_ord(); + let end = suffix + .iter() + .position(|b| *b == 0u8) + .unwrap_or(suffix.len()); + facet_child.extend(&suffix[..end]); + } + collapsed[facet_terms.term_ord() as usize] = (term_ord, depth); + } + } + if !facet_terms.advance() { + return Ok(false); + } + } +} + +fn compute_collapse_mapping( + facet_dict: &columnar::Dictionary, + facets: &BTreeSet, +) -> io::Result> { + let mut collapsed = vec![(u64::MAX, 0); facet_dict.num_terms()]; + if facets.is_empty() { + return Ok(collapsed); + } + let mut facet_terms: columnar::Streamer = facet_dict.range().into_stream()?; + if !facet_terms.advance() { + return Ok(collapsed); + } + let mut facet_bytes = Vec::new(); + for facet in facets { + facet_bytes.clear(); + facet_bytes.extend(facet.encoded_str().as_bytes()); + if !compute_collapse_mapping_one(&mut facet_terms, &facet_bytes, &mut collapsed[..])? { + break; + } + } + Ok(collapsed) +} + impl SegmentCollector for FacetSegmentCollector { type Fruit = FacetCounts; fn collect(&mut self, doc: DocId, _: Score) { let mut previous_collapsed_ord: usize = usize::MAX; for facet_ord in self.reader.facet_ords(doc) { - let collapsed_ord = self.collapse_mapping[facet_ord as usize]; + let collapsed_ord = self.compressed_collapse_mapping[facet_ord as usize]; self.counts[collapsed_ord] += u64::from(collapsed_ord != previous_collapsed_ord); previous_collapsed_ord = collapsed_ord; } @@ -353,9 +370,17 @@ impl SegmentCollector for FacetSegmentCollector { continue; } let mut facet = vec![]; - let facet_ord = self.collapse_facet_ords[collapsed_facet_ord]; + let (facet_ord, facet_depth) = self.unique_facet_ords[collapsed_facet_ord]; // TODO handle errors. if facet_dict.ord_to_term(facet_ord, &mut facet).is_ok() { + if let Some((end_collapsed_facet, _)) = facet + .iter() + .enumerate() + .filter(|(_pos, &b)| b == 0u8) + .nth(facet_depth) + { + facet.truncate(end_collapsed_facet); + } if let Ok(facet) = Facet::from_encoded(facet) { facet_counts.insert(facet, count); } @@ -439,19 +464,78 @@ impl FacetCounts { #[cfg(test)] mod tests { + use std::collections::BTreeSet; use std::iter; + use columnar::Dictionary; use rand::distributions::Uniform; use rand::prelude::SliceRandom; use rand::{thread_rng, Rng}; use super::{FacetCollector, FacetCounts}; + use crate::collector::facet_collector::compress_mapping; use crate::collector::Count; use crate::core::Index; use crate::query::{AllQuery, QueryParser, TermQuery}; use crate::schema::{Document, Facet, FacetOptions, IndexRecordOption, Schema}; use crate::Term; + fn test_collapse_mapping_aux( + facet_terms: &[&str], + facet_params: &[&str], + expected_collapsed_mapping: &[(u64, usize)], + ) { + let mut facets: Vec = facet_terms.iter().map(Facet::from).collect(); + facets.sort(); + let facet_terms: Vec<&str> = facets.iter().map(|facet| facet.encoded_str()).collect(); + let dictionary = Dictionary::build_for_tests(&facet_terms); + let facet_params: BTreeSet = facet_params.iter().map(Facet::from).collect(); + let collapse_mapping = super::compute_collapse_mapping(&dictionary, &facet_params).unwrap(); + assert_eq!(&collapse_mapping[..], expected_collapsed_mapping); + } + + #[test] + fn test_collapse_simple() { + test_collapse_mapping_aux(&["/facet/a", "/facet/b"], &["/facet"], &[(0, 1), (1, 1)]); + test_collapse_mapping_aux( + &["/facet/a", "/facet/a2", "/facet/b"], + &["/facet"], + &[(0, 1), (1, 1), (2, 1)], + ); + test_collapse_mapping_aux(&["/facet/a", "/facet/a/2"], &["/facet"], &[(0, 1), (0, 1)]); + test_collapse_mapping_aux( + &["/facet/a", "/facet/a/2", "/facet/b"], + &["/facet"], + &[(0, 1), (0, 1), (2, 1)], + ); + } + + fn test_compress_mapping_aux( + collapsed_mapping: &[(u64, usize)], + expected_compressed_collapsed_mapping: &[usize], + expected_unique_facet_ords: &[(u64, usize)], + ) { + let (compressed_collapsed_mapping, unique_facet_ords) = + compress_mapping(&collapsed_mapping); + assert_eq!( + compressed_collapsed_mapping, + expected_compressed_collapsed_mapping + ); + assert_eq!(unique_facet_ords, expected_unique_facet_ords); + } + + #[test] + fn test_compress_mapping() { + test_compress_mapping_aux(&[], &[], &[]); + test_compress_mapping_aux(&[(1, 2)], &[0], &[(1, 2)]); + test_compress_mapping_aux(&[(1, 2), (1, 2)], &[0, 0], &[(1, 2)]); + test_compress_mapping_aux( + &[(1, 2), (5, 2), (5, 2), (6, 3), (8, 3)], + &[0, 1, 1, 2, 3], + &[(1, 2), (5, 2), (6, 3), (8, 3)], + ); + } + #[test] fn test_facet_collector_simple() { let mut schema_builder = Schema::builder(); @@ -477,17 +561,17 @@ mod tests { facet_collector.add_facet("/facet"); let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector).unwrap(); let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 1); - assert_eq!(facets, vec![(&Facet::from("/facet/a"), 2)]); + assert_eq!(facets, vec![(&Facet::from("/facet/b"), 2)]); } #[test] - fn test_facet_collector_drilldown() -> crate::Result<()> { + fn test_facet_collector_drilldown() { let mut schema_builder = Schema::builder(); let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default()); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_for_tests()?; + let mut index_writer = index.writer_for_tests().unwrap(); let num_facets: usize = 3 * 4 * 5; let facets: Vec = (0..num_facets) .map(|mut n| { @@ -502,14 +586,14 @@ mod tests { for i in 0..num_facets * 10 { let mut doc = Document::new(); doc.add_facet(facet_field, facets[i % num_facets].clone()); - index_writer.add_document(doc)?; + index_writer.add_document(doc).unwrap(); } - index_writer.commit()?; - let reader = index.reader()?; + index_writer.commit().unwrap(); + let reader = index.reader().unwrap(); let searcher = reader.searcher(); let mut facet_collector = FacetCollector::for_field("facet"); facet_collector.add_facet(Facet::from("/top1")); - let counts = searcher.search(&AllQuery, &facet_collector)?; + let counts = searcher.search(&AllQuery, &facet_collector).unwrap(); { let facets: Vec<(String, u64)> = counts @@ -529,7 +613,6 @@ mod tests { .collect::>() ); } - Ok(()) } #[test] diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 81db1fc6b..afab84629 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -37,6 +37,21 @@ pub struct Dictionary { phantom_data: PhantomData, } +impl Dictionary { + pub fn build_for_tests(terms: &[&str]) -> Dictionary { + let mut terms = terms.to_vec(); + terms.sort(); + let mut buffer = Vec::new(); + let mut dictionary_writer = Self::builder(&mut buffer).unwrap(); + for term in terms { + dictionary_writer.insert(term, &()).unwrap(); + } + dictionary_writer.finish().unwrap(); + let dictionary = Dictionary::from_bytes(OwnedBytes::new(buffer)).unwrap(); + dictionary + } +} + impl Dictionary { pub fn builder(wrt: W) -> io::Result> { Ok(TSSTable::writer(wrt))