From ebc78127f3edbc0aa7221378c805cb81ebd93421 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Tue, 13 Jun 2023 07:19:58 +0200 Subject: [PATCH] Add BytesFilterCollector to support filtering based on a bytes fast field (#2075) * Do some Clippy- and Cargo-related boy-scouting. * Add BytesFilterCollector to support filtering based on a bytes fast field This is basically a copy of the existing FilterCollector but modified and specialised to work on a bytes fast field. * Changed semantics of filter collectors to consider multi-valued fields --- bitpacker/src/filter_vec/mod.rs | 2 +- columnar/Cargo.toml | 2 +- src/collector/filter_collector_wrapper.rs | 254 ++++++++++++++++++---- src/collector/mod.rs | 2 +- src/indexer/stamper.rs | 2 + src/lib.rs | 4 +- sstable/Cargo.toml | 2 +- stacker/src/fastcpy.rs | 2 +- 8 files changed, 223 insertions(+), 47 deletions(-) diff --git a/bitpacker/src/filter_vec/mod.rs b/bitpacker/src/filter_vec/mod.rs index 5f1b81bf8..051b1ae82 100644 --- a/bitpacker/src/filter_vec/mod.rs +++ b/bitpacker/src/filter_vec/mod.rs @@ -1,6 +1,6 @@ use std::ops::RangeInclusive; -#[cfg(any(target_arch = "x86_64"))] +#[cfg(target_arch = "x86_64")] mod avx2; mod scalar; diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index 51884d447..e2b42ed44 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" license = "MIT" homepage = "https://github.com/quickwit-oss/tantivy" repository = "https://github.com/quickwit-oss/tantivy" -desciption = "column oriented storage for tantivy" +description = "column oriented storage for tantivy" categories = ["database-implementations", "data-structures", "compression"] [dependencies] diff --git a/src/collector/filter_collector_wrapper.rs b/src/collector/filter_collector_wrapper.rs index 7b761820b..c19253cbc 100644 --- a/src/collector/filter_collector_wrapper.rs +++ b/src/collector/filter_collector_wrapper.rs @@ -6,32 +6,35 @@ // // Of course, you can have a look at the tantivy's built-in collectors // such as the `CountCollector` for more examples. - -// --- -// Importing tantivy... +use std::fmt::Debug; use std::marker::PhantomData; -use std::sync::Arc; -use columnar::{ColumnValues, DynamicColumn, HasAssociatedColumnType}; +use columnar::{BytesColumn, Column, DynamicColumn, HasAssociatedColumnType}; use crate::collector::{Collector, SegmentCollector}; use crate::schema::Field; -use crate::{Score, SegmentReader, TantivyError}; +use crate::{DocId, Score, SegmentReader, TantivyError}; /// The `FilterCollector` filters docs using a fast field value and a predicate. -/// Only the documents for which the predicate returned "true" will be passed on to the next -/// collector. +/// +/// Only the documents containing at least one value for which the predicate returns `true` +/// will be passed on to the next collector. +/// +/// In other words, +/// - documents with no values are filtered out. +/// - documents with several values are accepted if at least one value matches the predicate. +/// /// /// ```rust /// use tantivy::collector::{TopDocs, FilterCollector}; /// use tantivy::query::QueryParser; -/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST}; +/// use tantivy::schema::{Schema, TEXT, FAST}; /// use tantivy::{doc, DocAddress, Index}; /// /// # fn main() -> tantivy::Result<()> { /// let mut schema_builder = Schema::builder(); /// let title = schema_builder.add_text_field("title", TEXT); -/// let price = schema_builder.add_u64_field("price", INDEXED | FAST); +/// let price = schema_builder.add_u64_field("price", FAST); /// let schema = schema_builder.build(); /// let index = Index::create_in_ram(schema); /// @@ -47,20 +50,24 @@ use crate::{Score, SegmentReader, TantivyError}; /// /// let query_parser = QueryParser::for_index(&index, vec![title]); /// let query = query_parser.parse_query("diary")?; -/// let no_filter_collector = FilterCollector::new(price, &|value: u64| value > 20_120u64, TopDocs::with_limit(2)); +/// let no_filter_collector = FilterCollector::new(price, |value: u64| value > 20_120u64, TopDocs::with_limit(2)); /// let top_docs = searcher.search(&query, &no_filter_collector)?; /// /// assert_eq!(top_docs.len(), 1); /// assert_eq!(top_docs[0].1, DocAddress::new(0, 1)); /// -/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2)); +/// let filter_all_collector: FilterCollector<_, _, u64> = FilterCollector::new(price, |value| value < 5u64, TopDocs::with_limit(2)); /// let filtered_top_docs = searcher.search(&query, &filter_all_collector)?; /// /// assert_eq!(filtered_top_docs.len(), 0); /// # Ok(()) /// # } /// ``` -pub struct FilterCollector +/// +/// Note that this is limited to fast fields which implement the [`FastValue`] trait, +/// e.g. `u64` but not `&[u8]`. To filter based on a bytes fast field, +/// use a [`BytesFilterCollector`] instead. +pub struct FilterCollector where TPredicate: 'static + Clone { field: Field, @@ -69,19 +76,15 @@ where TPredicate: 'static + Clone t_predicate_value: PhantomData, } -impl +impl FilterCollector where TCollector: Collector + Send + Sync, TPredicate: Fn(TPredicateValue) -> bool + Send + Sync + Clone, { - /// Create a new FilterCollector. - pub fn new( - field: Field, - predicate: TPredicate, - collector: TCollector, - ) -> FilterCollector { - FilterCollector { + /// Create a new `FilterCollector`. + pub fn new(field: Field, predicate: TPredicate, collector: TCollector) -> Self { + Self { field, predicate, collector, @@ -90,7 +93,7 @@ where } } -impl Collector +impl Collector for FilterCollector where TCollector: Collector + Send + Sync, @@ -98,8 +101,6 @@ where TPredicateValue: HasAssociatedColumnType, DynamicColumn: Into>>, { - // That's the type of our result. - // Our standard deviation will be a float. type Fruit = TCollector::Fruit; type Child = FilterSegmentCollector; @@ -108,7 +109,7 @@ where &self, segment_local_id: u32, segment_reader: &SegmentReader, - ) -> crate::Result> { + ) -> crate::Result { let schema = segment_reader.schema(); let field_entry = schema.get_field_entry(self.field); if !field_entry.is_fast() { @@ -118,16 +119,16 @@ where ))); } - let fast_field_reader = segment_reader + let column_opt = segment_reader .fast_fields() - .column_first_or_default(schema.get_field_name(self.field))?; + .column_opt(field_entry.name())?; let segment_collector = self .collector .for_segment(segment_local_id, segment_reader)?; Ok(FilterSegmentCollector { - fast_field_reader, + column_opt, segment_collector, predicate: self.predicate.clone(), t_predicate_value: PhantomData, @@ -146,35 +147,208 @@ where } } -pub struct FilterSegmentCollector -where - TPredicate: 'static, - DynamicColumn: Into>>, -{ - fast_field_reader: Arc>, +pub struct FilterSegmentCollector { + column_opt: Option>, segment_collector: TSegmentCollector, predicate: TPredicate, t_predicate_value: PhantomData, } +impl + FilterSegmentCollector +where + TPredicateValue: PartialOrd + Copy + Debug + Send + Sync + 'static, + TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync, +{ + #[inline] + fn accept_document(&self, doc_id: DocId) -> bool { + if let Some(column) = &self.column_opt { + for val in column.values_for_doc(doc_id) { + if (self.predicate)(val) { + return true; + } + } + } + false + } +} + impl SegmentCollector for FilterSegmentCollector where TSegmentCollector: SegmentCollector, TPredicateValue: HasAssociatedColumnType, - TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync, - DynamicColumn: Into>>, + TPredicate: 'static + Fn(TPredicateValue) -> bool + Send + Sync, /* DynamicColumn: Into>> */ { type Fruit = TSegmentCollector::Fruit; fn collect(&mut self, doc: u32, score: Score) { - let value = self.fast_field_reader.get_val(doc); - if (self.predicate)(value) { - self.segment_collector.collect(doc, score) + if self.accept_document(doc) { + self.segment_collector.collect(doc, score); } } - fn harvest(self) -> ::Fruit { + fn harvest(self) -> TSegmentCollector::Fruit { + self.segment_collector.harvest() + } +} + +/// A variant of the [`FilterCollector`] specialized for bytes fast fields, i.e. +/// it transparently wraps an inner [`Collector`] but filters documents +/// based on the result of applying the predicate to the bytes fast field. +/// +/// A document is accepted if and only if the predicate returns `true` for at least one value. +/// +/// In other words, +/// - documents with no values are filtered out. +/// - documents with several values are accepted if at least one value matches the predicate. +/// +/// ```rust +/// use tantivy::collector::{TopDocs, BytesFilterCollector}; +/// use tantivy::query::QueryParser; +/// use tantivy::schema::{Schema, TEXT, FAST}; +/// use tantivy::{doc, DocAddress, Index}; +/// +/// # fn main() -> tantivy::Result<()> { +/// let mut schema_builder = Schema::builder(); +/// let title = schema_builder.add_text_field("title", TEXT); +/// let barcode = schema_builder.add_bytes_field("barcode", FAST); +/// let schema = schema_builder.build(); +/// let index = Index::create_in_ram(schema); +/// +/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000)?; +/// index_writer.add_document(doc!(title => "The Name of the Wind", barcode => &b"010101"[..]))?; +/// index_writer.add_document(doc!(title => "The Diary of Muadib", barcode => &b"110011"[..]))?; +/// index_writer.add_document(doc!(title => "A Dairy Cow", barcode => &b"110111"[..]))?; +/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", barcode => &b"011101"[..]))?; +/// index_writer.add_document(doc!(title => "Bridget Jones's Diary"))?; +/// index_writer.commit()?; +/// +/// let reader = index.reader()?; +/// let searcher = reader.searcher(); +/// +/// let query_parser = QueryParser::for_index(&index, vec![title]); +/// let query = query_parser.parse_query("diary")?; +/// let filter_collector = BytesFilterCollector::new(barcode, |bytes: &[u8]| bytes.starts_with(b"01"), TopDocs::with_limit(2)); +/// let top_docs = searcher.search(&query, &filter_collector)?; +/// +/// assert_eq!(top_docs.len(), 1); +/// assert_eq!(top_docs[0].1, DocAddress::new(0, 3)); +/// # Ok(()) +/// # } +/// ``` +pub struct BytesFilterCollector +where TPredicate: 'static + Clone +{ + field: Field, + collector: TCollector, + predicate: TPredicate, +} + +impl BytesFilterCollector +where + TCollector: Collector + Send + Sync, + TPredicate: Fn(&[u8]) -> bool + Send + Sync + Clone, +{ + /// Create a new `BytesFilterCollector`. + pub fn new(field: Field, predicate: TPredicate, collector: TCollector) -> Self { + Self { + field, + predicate, + collector, + } + } +} + +impl Collector for BytesFilterCollector +where + TCollector: Collector + Send + Sync, + TPredicate: 'static + Fn(&[u8]) -> bool + Send + Sync + Clone, +{ + type Fruit = TCollector::Fruit; + + type Child = BytesFilterSegmentCollector; + + fn for_segment( + &self, + segment_local_id: u32, + segment_reader: &SegmentReader, + ) -> crate::Result { + let schema = segment_reader.schema(); + let field_name = schema.get_field_name(self.field); + + let column_opt = segment_reader.fast_fields().bytes(field_name)?; + + let segment_collector = self + .collector + .for_segment(segment_local_id, segment_reader)?; + + Ok(BytesFilterSegmentCollector { + column_opt, + segment_collector, + predicate: self.predicate.clone(), + buffer: Vec::new(), + }) + } + + fn requires_scoring(&self) -> bool { + self.collector.requires_scoring() + } + + fn merge_fruits( + &self, + segment_fruits: Vec<::Fruit>, + ) -> crate::Result { + self.collector.merge_fruits(segment_fruits) + } +} + +pub struct BytesFilterSegmentCollector +where TPredicate: 'static +{ + column_opt: Option, + segment_collector: TSegmentCollector, + predicate: TPredicate, + buffer: Vec, +} + +impl BytesFilterSegmentCollector +where + TSegmentCollector: SegmentCollector, + TPredicate: 'static + Fn(&[u8]) -> bool + Send + Sync, +{ + #[inline] + fn accept_document(&mut self, doc_id: DocId) -> bool { + if let Some(column) = &self.column_opt { + for ord in column.term_ords(doc_id) { + self.buffer.clear(); + + let found = column.ord_to_bytes(ord, &mut self.buffer).unwrap_or(false); + + if found && (self.predicate)(&self.buffer) { + return true; + } + } + } + false + } +} + +impl SegmentCollector + for BytesFilterSegmentCollector +where + TSegmentCollector: SegmentCollector, + TPredicate: 'static + Fn(&[u8]) -> bool + Send + Sync, +{ + type Fruit = TSegmentCollector::Fruit; + + fn collect(&mut self, doc: u32, score: Score) { + if self.accept_document(doc) { + self.segment_collector.collect(doc, score); + } + } + + fn harvest(self) -> TSegmentCollector::Fruit { self.segment_collector.harvest() } } diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 2abe206b9..4015ea409 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -112,7 +112,7 @@ mod docset_collector; pub use self::docset_collector::DocSetCollector; mod filter_collector_wrapper; -pub use self::filter_collector_wrapper::FilterCollector; +pub use self::filter_collector_wrapper::{BytesFilterCollector, FilterCollector}; /// `Fruit` is the type for the result of our collection. /// e.g. `usize` for the `Count` collector. diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 8287d841c..733aab82a 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -101,6 +101,7 @@ mod test { use super::Stamper; + #[allow(clippy::redundant_clone)] #[test] fn test_stamper() { let stamper = Stamper::new(7u64); @@ -116,6 +117,7 @@ mod test { assert_eq!(stamper.stamp(), 15u64); } + #[allow(clippy::redundant_clone)] #[test] fn test_stamper_revert() { let stamper = Stamper::new(7u64); diff --git a/src/lib.rs b/src/lib.rs index ce4f2a31b..286c44e93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -876,8 +876,8 @@ pub mod tests { }"#, ) .unwrap(); - let doc = doc!(json_field=>json_val.clone()); - let index = Index::create_in_ram(schema.clone()); + let doc = doc!(json_field=>json_val); + let index = Index::create_in_ram(schema); let mut writer = index.writer_for_tests().unwrap(); writer.add_document(doc).unwrap(); writer.commit().unwrap(); diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 39f44798a..bce6d0ab7 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://github.com/quickwit-oss/tantivy" repository = "https://github.com/quickwit-oss/tantivy" keywords = ["search", "information", "retrieval", "sstable"] categories = ["database-implementations", "data-structures", "compression"] -desciption = "sstables for tantivy" +description = "sstables for tantivy" [dependencies] common = {version= "0.5", path="../common", package="tantivy-common"} diff --git a/stacker/src/fastcpy.rs b/stacker/src/fastcpy.rs index 6731f8b6f..8195e17f0 100644 --- a/stacker/src/fastcpy.rs +++ b/stacker/src/fastcpy.rs @@ -44,7 +44,7 @@ pub fn fast_short_slice_copy(src: &[u8], dst: &mut [u8]) { return; } - /// The code will use the vmovdqu instruction to copy 32 bytes at a time. + // The code will use the vmovdqu instruction to copy 32 bytes at a time. #[cfg(target_feature = "avx")] { if len <= 64 {