Replugging facet collector

This commit is contained in:
Paul Masurel
2023-01-24 15:31:34 +09:00
parent 965ce3ef3e
commit 25fb27f1a6
3 changed files with 196 additions and 96 deletions

View File

@@ -25,12 +25,14 @@ pub use columnar::{
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeDocOrder,
};
use sstable::VoidSSTable;
pub use value::{NumericalType, NumericalValue};
pub use self::dynamic_column::{DynamicColumn, DynamicColumnHandle};
pub type RowId = u32;
pub use sstable::Dictionary;
pub type Streamer<'a> = sstable::Streamer<'a, VoidSSTable>;
#[derive(Clone, Copy, PartialOrd, PartialEq, Default, Debug)]
pub struct DateTime {

View File

@@ -1,12 +1,11 @@
use std::cmp::Ordering;
use std::collections::{btree_map, BTreeMap, BTreeSet, BinaryHeap};
use std::iter::Peekable;
use std::ops::Bound;
use std::{u64, usize};
use std::{io, u64, usize};
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::FacetReader;
use crate::schema::{Facet, Field};
use crate::schema::Facet;
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
struct Hit<'a> {
@@ -119,7 +118,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// let searcher = reader.searcher();
///
/// {
/// let mut facet_collector = FacetCollector::for_field(facet);
/// let mut facet_collector = FacetCollector::for_field("facet");
/// facet_collector.add_facet("/lang");
/// facet_collector.add_facet("/category");
/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
@@ -135,7 +134,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// }
///
/// {
/// let mut facet_collector = FacetCollector::for_field(facet);
/// let mut facet_collector = FacetCollector::for_field("facet");
/// facet_collector.add_facet("/category/fiction");
/// let facet_counts = searcher.search(&AllQuery, &facet_collector)?;
///
@@ -173,41 +172,12 @@ pub struct FacetCollector {
pub struct FacetSegmentCollector {
reader: FacetReader,
// facet_ord -> collapse facet_id
collapse_mapping: Vec<usize>,
// collapse facet_id -> count
counts: Vec<u64>,
// collapse facet_id -> facet_ord
collapse_facet_ords: Vec<u64>,
}
#[derive(Debug)]
enum SkipResult {
Found,
NotFound,
}
fn skip<'a, I: Iterator<Item = &'a Facet>>(
target: &[u8],
collapse_it: &mut Peekable<I>,
) -> SkipResult {
loop {
match collapse_it.peek() {
Some(facet_bytes) => match facet_bytes.encoded_str().as_bytes().cmp(target) {
Ordering::Less => {}
Ordering::Greater => {
return SkipResult::NotFound;
}
Ordering::Equal => {
return SkipResult::Found;
}
},
None => {
return SkipResult::NotFound;
}
}
collapse_it.next();
}
// facet_ord -> compressed collapse facet_id
compressed_collapse_mapping: Vec<usize>,
// compressed collapse facet_id -> facet_ord
unique_facet_ords: Vec<(u64, usize)>,
}
impl FacetCollector {
@@ -249,6 +219,29 @@ impl FacetCollector {
}
}
fn compress_mapping(mapping: &[(u64, usize)]) -> (Vec<usize>, Vec<(u64, usize)>) {
// facet_ord -> collapse facet_id
let mut compressed_collapse_mapping: Vec<usize> = Vec::with_capacity(mapping.len());
// collapse facet_id -> facet_ord
let mut unique_facet_ords: Vec<(u64, usize)> = Vec::new();
if mapping.is_empty() {
return (Vec::new(), Vec::new());
}
compressed_collapse_mapping.push(0);
unique_facet_ords.push(mapping[0]);
let mut last_facet_ord = mapping[0];
let mut last_facet_id = 0;
for &facet_ord in &mapping[1..] {
if facet_ord != last_facet_ord {
last_facet_id += 1;
last_facet_ord = facet_ord;
unique_facet_ords.push(facet_ord);
}
compressed_collapse_mapping.push(last_facet_id);
}
(compressed_collapse_mapping, unique_facet_ords)
}
impl Collector for FacetCollector {
type Fruit = FacetCounts;
@@ -260,57 +253,16 @@ impl Collector for FacetCollector {
reader: &SegmentReader,
) -> crate::Result<FacetSegmentCollector> {
let facet_reader = reader.facet_reader(&self.field_name)?;
let mut collapse_mapping = Vec::new();
let mut counts = Vec::new();
let mut collapse_facet_ords = Vec::new();
let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0);
{
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
if facet_streamer.advance() {
'outer: loop {
// at the beginning of this loop, facet_streamer
// is positioned on a term that has not been processed yet.
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
match skip_result {
SkipResult::Found => {
// we reach a facet we decided to collapse.
let collapse_depth = facet_depth(facet_streamer.key());
let mut collapsed_id = 0;
collapse_mapping.push(0);
while facet_streamer.advance() {
let depth = facet_depth(facet_streamer.key());
if depth <= collapse_depth {
continue 'outer;
}
if depth == collapse_depth + 1 {
collapsed_id = collapse_facet_ords.len();
collapse_facet_ords.push(facet_streamer.term_ord());
}
collapse_mapping.push(collapsed_id);
}
break;
}
SkipResult::NotFound => {
collapse_mapping.push(0);
if !facet_streamer.advance() {
break;
}
}
}
}
}
}
counts.resize(collapse_facet_ords.len(), 0);
let facet_dict = facet_reader.facet_dict();
let collapse_mapping: Vec<(u64, usize)> =
compute_collapse_mapping(facet_dict, &self.facets)?;
let (compressed_collapse_mapping, unique_facet_ords) = compress_mapping(&collapse_mapping);
let counts = vec![0u64; unique_facet_ords.len()];
Ok(FacetSegmentCollector {
reader: facet_reader,
collapse_mapping,
compressed_collapse_mapping,
counts,
collapse_facet_ords,
unique_facet_ords,
})
}
@@ -329,13 +281,78 @@ impl Collector for FacetCollector {
}
}
fn is_child_facet(parent_facet: &[u8], possible_child_facet: &[u8]) -> bool {
if !possible_child_facet.starts_with(parent_facet) {
return false;
}
possible_child_facet.get(parent_facet.len()).copied() == Some(0u8)
}
fn compute_collapse_mapping_one(
facet_terms: &mut columnar::Streamer,
facet_bytes: &[u8],
collapsed: &mut [(u64, usize)],
) -> io::Result<bool> {
let mut facet_child: Vec<u8> = Vec::new();
let mut term_ord = 0;
let offset = facet_bytes.len() + 1;
let depth = facet_depth(facet_bytes);
loop {
match facet_terms.key().cmp(facet_bytes) {
Ordering::Less | Ordering::Equal => {}
Ordering::Greater => {
if !is_child_facet(facet_bytes, facet_terms.key()) {
return Ok(true);
}
let suffix = &facet_terms.key()[offset..];
if facet_child.is_empty() || !is_child_facet(&facet_child, suffix) {
facet_child.clear();
term_ord = facet_terms.term_ord();
let end = suffix
.iter()
.position(|b| *b == 0u8)
.unwrap_or(suffix.len());
facet_child.extend(&suffix[..end]);
}
collapsed[facet_terms.term_ord() as usize] = (term_ord, depth);
}
}
if !facet_terms.advance() {
return Ok(false);
}
}
}
fn compute_collapse_mapping(
facet_dict: &columnar::Dictionary,
facets: &BTreeSet<Facet>,
) -> io::Result<Vec<(u64, usize)>> {
let mut collapsed = vec![(u64::MAX, 0); facet_dict.num_terms()];
if facets.is_empty() {
return Ok(collapsed);
}
let mut facet_terms: columnar::Streamer = facet_dict.range().into_stream()?;
if !facet_terms.advance() {
return Ok(collapsed);
}
let mut facet_bytes = Vec::new();
for facet in facets {
facet_bytes.clear();
facet_bytes.extend(facet.encoded_str().as_bytes());
if !compute_collapse_mapping_one(&mut facet_terms, &facet_bytes, &mut collapsed[..])? {
break;
}
}
Ok(collapsed)
}
impl SegmentCollector for FacetSegmentCollector {
type Fruit = FacetCounts;
fn collect(&mut self, doc: DocId, _: Score) {
let mut previous_collapsed_ord: usize = usize::MAX;
for facet_ord in self.reader.facet_ords(doc) {
let collapsed_ord = self.collapse_mapping[facet_ord as usize];
let collapsed_ord = self.compressed_collapse_mapping[facet_ord as usize];
self.counts[collapsed_ord] += u64::from(collapsed_ord != previous_collapsed_ord);
previous_collapsed_ord = collapsed_ord;
}
@@ -353,9 +370,17 @@ impl SegmentCollector for FacetSegmentCollector {
continue;
}
let mut facet = vec![];
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
let (facet_ord, facet_depth) = self.unique_facet_ords[collapsed_facet_ord];
// TODO handle errors.
if facet_dict.ord_to_term(facet_ord, &mut facet).is_ok() {
if let Some((end_collapsed_facet, _)) = facet
.iter()
.enumerate()
.filter(|(_pos, &b)| b == 0u8)
.nth(facet_depth)
{
facet.truncate(end_collapsed_facet);
}
if let Ok(facet) = Facet::from_encoded(facet) {
facet_counts.insert(facet, count);
}
@@ -439,19 +464,78 @@ impl FacetCounts {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::iter;
use columnar::Dictionary;
use rand::distributions::Uniform;
use rand::prelude::SliceRandom;
use rand::{thread_rng, Rng};
use super::{FacetCollector, FacetCounts};
use crate::collector::facet_collector::compress_mapping;
use crate::collector::Count;
use crate::core::Index;
use crate::query::{AllQuery, QueryParser, TermQuery};
use crate::schema::{Document, Facet, FacetOptions, IndexRecordOption, Schema};
use crate::Term;
fn test_collapse_mapping_aux(
facet_terms: &[&str],
facet_params: &[&str],
expected_collapsed_mapping: &[(u64, usize)],
) {
let mut facets: Vec<Facet> = facet_terms.iter().map(Facet::from).collect();
facets.sort();
let facet_terms: Vec<&str> = facets.iter().map(|facet| facet.encoded_str()).collect();
let dictionary = Dictionary::build_for_tests(&facet_terms);
let facet_params: BTreeSet<Facet> = facet_params.iter().map(Facet::from).collect();
let collapse_mapping = super::compute_collapse_mapping(&dictionary, &facet_params).unwrap();
assert_eq!(&collapse_mapping[..], expected_collapsed_mapping);
}
#[test]
fn test_collapse_simple() {
test_collapse_mapping_aux(&["/facet/a", "/facet/b"], &["/facet"], &[(0, 1), (1, 1)]);
test_collapse_mapping_aux(
&["/facet/a", "/facet/a2", "/facet/b"],
&["/facet"],
&[(0, 1), (1, 1), (2, 1)],
);
test_collapse_mapping_aux(&["/facet/a", "/facet/a/2"], &["/facet"], &[(0, 1), (0, 1)]);
test_collapse_mapping_aux(
&["/facet/a", "/facet/a/2", "/facet/b"],
&["/facet"],
&[(0, 1), (0, 1), (2, 1)],
);
}
fn test_compress_mapping_aux(
collapsed_mapping: &[(u64, usize)],
expected_compressed_collapsed_mapping: &[usize],
expected_unique_facet_ords: &[(u64, usize)],
) {
let (compressed_collapsed_mapping, unique_facet_ords) =
compress_mapping(&collapsed_mapping);
assert_eq!(
compressed_collapsed_mapping,
expected_compressed_collapsed_mapping
);
assert_eq!(unique_facet_ords, expected_unique_facet_ords);
}
#[test]
fn test_compress_mapping() {
test_compress_mapping_aux(&[], &[], &[]);
test_compress_mapping_aux(&[(1, 2)], &[0], &[(1, 2)]);
test_compress_mapping_aux(&[(1, 2), (1, 2)], &[0, 0], &[(1, 2)]);
test_compress_mapping_aux(
&[(1, 2), (5, 2), (5, 2), (6, 3), (8, 3)],
&[0, 1, 1, 2, 3],
&[(1, 2), (5, 2), (6, 3), (8, 3)],
);
}
#[test]
fn test_facet_collector_simple() {
let mut schema_builder = Schema::builder();
@@ -477,17 +561,17 @@ mod tests {
facet_collector.add_facet("/facet");
let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector).unwrap();
let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 1);
assert_eq!(facets, vec![(&Facet::from("/facet/a"), 2)]);
assert_eq!(facets, vec![(&Facet::from("/facet/b"), 2)]);
}
#[test]
fn test_facet_collector_drilldown() -> crate::Result<()> {
fn test_facet_collector_drilldown() {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
let mut index_writer = index.writer_for_tests().unwrap();
let num_facets: usize = 3 * 4 * 5;
let facets: Vec<Facet> = (0..num_facets)
.map(|mut n| {
@@ -502,14 +586,14 @@ mod tests {
for i in 0..num_facets * 10 {
let mut doc = Document::new();
doc.add_facet(facet_field, facets[i % num_facets].clone());
index_writer.add_document(doc)?;
index_writer.add_document(doc).unwrap();
}
index_writer.commit()?;
let reader = index.reader()?;
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let mut facet_collector = FacetCollector::for_field("facet");
facet_collector.add_facet(Facet::from("/top1"));
let counts = searcher.search(&AllQuery, &facet_collector)?;
let counts = searcher.search(&AllQuery, &facet_collector).unwrap();
{
let facets: Vec<(String, u64)> = counts
@@ -529,7 +613,6 @@ mod tests {
.collect::<Vec<_>>()
);
}
Ok(())
}
#[test]

View File

@@ -37,6 +37,21 @@ pub struct Dictionary<TSSTable: SSTable = VoidSSTable> {
phantom_data: PhantomData<TSSTable>,
}
impl Dictionary<VoidSSTable> {
pub fn build_for_tests(terms: &[&str]) -> Dictionary {
let mut terms = terms.to_vec();
terms.sort();
let mut buffer = Vec::new();
let mut dictionary_writer = Self::builder(&mut buffer).unwrap();
for term in terms {
dictionary_writer.insert(term, &()).unwrap();
}
dictionary_writer.finish().unwrap();
let dictionary = Dictionary::from_bytes(OwnedBytes::new(buffer)).unwrap();
dictionary
}
}
impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub fn builder<W: io::Write>(wrt: W) -> io::Result<crate::Writer<W, TSSTable::ValueWriter>> {
Ok(TSSTable::writer(wrt))