Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
ad0a7a78fd Simplify aggregation 2023-07-12 12:36:49 +09:00
5 changed files with 24 additions and 56 deletions

View File

@@ -49,7 +49,7 @@ murmurhash32 = "0.3.0"
time = { version = "0.3.10", features = ["serde-well-known"] } time = { version = "0.3.10", features = ["serde-well-known"] }
smallvec = "1.8.0" smallvec = "1.8.0"
rayon = "1.5.2" rayon = "1.5.2"
lru = "0.11.0" lru = "0.10.0"
fastdivide = "0.4.0" fastdivide = "0.4.0"
itertools = "0.11.0" itertools = "0.11.0"
measure_time = "0.8.2" measure_time = "0.8.2"

View File

@@ -123,6 +123,15 @@ impl AggregationWithAccessor {
column_block_accessor: Default::default(), column_block_accessor: Default::default(),
}) })
} }
/// Swaps the accessor and field type with the second accessor and field type.
/// This way we can use the same code for both aggregations.
pub(crate) fn swap_accessor(&mut self) {
if let Some(accessor) = self.accessor2.as_mut() {
std::mem::swap(&mut accessor.0, &mut self.accessor);
std::mem::swap(&mut accessor.1, &mut self.field_type);
}
}
} }
fn get_numeric_or_date_column_types() -> &'static [ColumnType] { fn get_numeric_or_date_column_types() -> &'static [ColumnType] {

View File

@@ -263,9 +263,9 @@ impl SegmentAggregationCollector for SegmentTermCollectorComposite {
agg_with_accessor: &mut AggregationsWithAccessor, agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> { ) -> crate::Result<()> {
self.term_agg1.collect_block(&[doc], agg_with_accessor)?; self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
self.term_agg2.collect_block(&[doc], agg_with_accessor)?; self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
Ok(()) Ok(())
} }
@@ -276,33 +276,22 @@ impl SegmentAggregationCollector for SegmentTermCollectorComposite {
agg_with_accessor: &mut AggregationsWithAccessor, agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> { ) -> crate::Result<()> {
self.term_agg1.collect_block(docs, agg_with_accessor)?; self.term_agg1.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
self.term_agg2.collect_block(docs, agg_with_accessor)?; self.term_agg2.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
Ok(()) Ok(())
} }
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> { fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
self.term_agg1.flush(agg_with_accessor)?; self.term_agg1.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
self.term_agg2.flush(agg_with_accessor)?; self.term_agg2.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]); agg_with_accessor.aggs.values[self.accessor_idx].swap_accessor();
Ok(()) Ok(())
} }
} }
impl SegmentTermCollectorComposite { impl SegmentTermCollectorComposite {
/// Swaps the accessor and field type with the second accessor and field type.
/// This way we can use the same code for both aggregations.
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
if let Some(accessor) = aggregations.accessor2.as_mut() {
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
}
}
pub(crate) fn from_req_and_validate( pub(crate) fn from_req_and_validate(
req: &TermsAggregation, req: &TermsAggregation,
sub_aggregations: &mut AggregationsWithAccessor, sub_aggregations: &mut AggregationsWithAccessor,

View File

@@ -1291,28 +1291,4 @@ mod tests {
let vals: Vec<i64> = column.values_for_doc(0u32).collect(); let vals: Vec<i64> = column.values_for_doc(0u32).collect();
assert_eq!(&vals, &[33]); assert_eq!(&vals, &[33]);
} }
#[test]
fn check_num_columnar_fields() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_text_field("id", FAST);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
index_writer.add_document(doc!(
id_field => 1u64,
))?;
index_writer.commit()?;
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let ff_reader = searcher.segment_reader(0).fast_fields();
let fields = ff_reader.u64_lenient_for_type_all(None, "id").unwrap();
assert_eq!(fields.len(), 1);
Ok(())
}
} }

View File

@@ -86,8 +86,6 @@ impl TokenFilter for SplitCompoundWords {
SplitCompoundWordsFilter { SplitCompoundWordsFilter {
dict: self.dict, dict: self.dict,
inner: tokenizer, inner: tokenizer,
cuts: Vec::new(),
parts: Vec::new(),
} }
} }
} }
@@ -96,33 +94,29 @@ impl TokenFilter for SplitCompoundWords {
pub struct SplitCompoundWordsFilter<T> { pub struct SplitCompoundWordsFilter<T> {
dict: AhoCorasick, dict: AhoCorasick,
inner: T, inner: T,
}
impl<T: Tokenizer> Tokenizer for SplitCompoundWordsFilter<T> {
type TokenStream<'a> = SplitCompoundWordsTokenStream<T::TokenStream<'a>>;
fn token_stream<'a>(&'a mut self, text: &'a str) -> Self::TokenStream<'a> {
SplitCompoundWordsTokenStream {
dict: self.dict.clone(),
tail: self.inner.token_stream(text),
cuts: Vec::new(),
parts: Vec::new(),
}
}
}
pub struct SplitCompoundWordsTokenStream<T> {
dict: AhoCorasick,
tail: T,
cuts: Vec<usize>, cuts: Vec<usize>,
parts: Vec<Token>, parts: Vec<Token>,
} }
impl<T: Tokenizer> Tokenizer for SplitCompoundWordsFilter<T> { impl<T: TokenStream> SplitCompoundWordsTokenStream<T> {
type TokenStream<'a> = SplitCompoundWordsTokenStream<'a, T::TokenStream<'a>>;
fn token_stream<'a>(&'a mut self, text: &'a str) -> Self::TokenStream<'a> {
self.cuts.clear();
self.parts.clear();
SplitCompoundWordsTokenStream {
dict: self.dict.clone(),
tail: self.inner.token_stream(text),
cuts: &mut self.cuts,
parts: &mut self.parts,
}
}
}
pub struct SplitCompoundWordsTokenStream<'a, T> {
dict: AhoCorasick,
tail: T,
cuts: &'a mut Vec<usize>,
parts: &'a mut Vec<Token>,
}
impl<'a, T: TokenStream> SplitCompoundWordsTokenStream<'a, T> {
// Will use `self.cuts` to fill `self.parts` if `self.tail.token()` // Will use `self.cuts` to fill `self.parts` if `self.tail.token()`
// can fully be split into consecutive matches against `self.dict`. // can fully be split into consecutive matches against `self.dict`.
fn split(&mut self) { fn split(&mut self) {
@@ -158,7 +152,7 @@ impl<'a, T: TokenStream> SplitCompoundWordsTokenStream<'a, T> {
} }
} }
impl<'a, T: TokenStream> TokenStream for SplitCompoundWordsTokenStream<'a, T> { impl<T: TokenStream> TokenStream for SplitCompoundWordsTokenStream<T> {
fn advance(&mut self) -> bool { fn advance(&mut self) -> bool {
self.parts.pop(); self.parts.pop();