diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index e720ecf62..bcd5b696e 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -23,6 +23,12 @@ downcast-rs = "1.2.0" proptest = "1" more-asserts = "0.3.1" rand = "0.8" +binggan = "0.8.1" + +[[bench]] +name = "bench_merge" +harness = false + [features] unstable = [] diff --git a/columnar/benches/bench_merge.rs b/columnar/benches/bench_merge.rs new file mode 100644 index 000000000..2e8d19864 --- /dev/null +++ b/columnar/benches/bench_merge.rs @@ -0,0 +1,101 @@ +#![feature(test)] +extern crate test; + +use core::fmt; +use std::fmt::{Display, Formatter}; + +use binggan::{black_box, BenchRunner}; +use tantivy_columnar::*; + +enum Card { + Multi, + Sparse, + Dense, +} +impl Display for Card { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Card::Multi => write!(f, "multi"), + Card::Sparse => write!(f, "sparse"), + Card::Dense => write!(f, "dense"), + } + } +} + +const NUM_DOCS: u32 = 100_000; + +fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { + use tantivy_columnar::ColumnarWriter; + + let mut columnar_writer = ColumnarWriter::default(); + + match card { + Card::Multi => { + columnar_writer.record_numerical(0, "price", 10u64); + columnar_writer.record_numerical(0, "price", 10u64); + } + _ => {} + } + + for i in 0..num_docs { + match card { + Card::Multi | Card::Sparse => { + if i % 8 == 0 { + columnar_writer.record_numerical(i, "price", i as u64); + } + } + Card::Dense => { + if i % 6 == 0 { + columnar_writer.record_numerical(i, "price", i as u64); + } + } + } + } + + let mut wrt: Vec = Vec::new(); + columnar_writer.serialize(num_docs, None, &mut wrt).unwrap(); + + ColumnarReader::open(wrt).unwrap() +} +fn main() { + let mut inputs = Vec::new(); + + let mut add_combo = |card1: Card, card2: Card| { + inputs.push(( + format!("merge_{card1}_and_{card2}"), + vec![ + generate_columnar(card1, NUM_DOCS), + generate_columnar(card2, NUM_DOCS), + ], + )); + }; + + add_combo(Card::Multi, Card::Multi); + add_combo(Card::Dense, Card::Dense); + add_combo(Card::Sparse, Card::Sparse); + add_combo(Card::Sparse, Card::Dense); + add_combo(Card::Multi, Card::Dense); + add_combo(Card::Multi, Card::Sparse); + + let runner: BenchRunner = BenchRunner::new(); + let mut group = runner.new_group(); + for (input_name, columnar_readers) in inputs.iter() { + group.register_with_input( + input_name, + columnar_readers, + move |columnar_readers: &Vec| { + let mut out = vec![]; + let columnar_readers = columnar_readers.iter().collect::>(); + let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); + + let _ = black_box(merge_columnar( + &columnar_readers, + &[], + merge_row_order.into(), + &mut out, + )); + }, + ); + } + group.run(); +} diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index d48415284..bd12ade19 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -196,6 +196,7 @@ impl Set for OptionalIndex { } = row_addr_from_row_id(doc_id); let block_meta = self.block_metas[block_id as usize]; let block = self.block(block_meta); + let block_offset_row_id = match block { Block::Dense(dense_block) => dense_block.rank(in_block_row_id), Block::Sparse(sparse_block) => sparse_block.rank(in_block_row_id), diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 6cd190f2a..f2955d3a3 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -787,6 +787,8 @@ impl IndexMerger { mod tests { use columnar::Column; + use proptest::prop_oneof; + use proptest::strategy::Strategy; use schema::FAST; use crate::collector::tests::{ @@ -794,6 +796,7 @@ mod tests { }; use crate::collector::{Count, FacetCollector}; use crate::index::{Index, SegmentId}; + use crate::indexer::NoMergePolicy; use crate::query::{AllQuery, BooleanQuery, EnableScoring, Scorer, TermQuery}; use crate::schema::{ Facet, FacetOptions, IndexRecordOption, NumericOptions, TantivyDocument, Term, @@ -1531,6 +1534,112 @@ mod tests { Ok(()) } + #[derive(Debug, Clone, Copy, Eq, PartialEq)] + enum IndexingOp { + ZeroVal, + OneVal { val: u64 }, + TwoVal { val: u64 }, + Commit, + } + + fn balanced_operation_strategy() -> impl Strategy { + prop_oneof![ + (0u64..1u64).prop_map(|_| IndexingOp::ZeroVal), + (0u64..1u64).prop_map(|val| IndexingOp::OneVal { val }), + (0u64..1u64).prop_map(|val| IndexingOp::TwoVal { val }), + (0u64..1u64).prop_map(|_| IndexingOp::Commit), + ] + } + + use proptest::prelude::*; + proptest! { + #[test] + fn test_merge_columnar_int_proptest(ops in proptest::collection::vec(balanced_operation_strategy(), 1..20)) { + assert!(test_merge_int_fields(&ops[..]).is_ok()); + } + } + fn test_merge_int_fields(ops: &[IndexingOp]) -> crate::Result<()> { + if ops.iter().all(|op| *op == IndexingOp::Commit) { + return Ok(()); + } + let expected_doc_and_vals: Vec<(u32, Vec)> = ops + .iter() + .filter(|op| *op != &IndexingOp::Commit) + .map(|op| match op { + IndexingOp::ZeroVal => vec![], + IndexingOp::OneVal { val } => vec![*val], + IndexingOp::TwoVal { val } => vec![*val, *val], + IndexingOp::Commit => unreachable!(), + }) + .enumerate() + .map(|(id, val)| (id as u32, val)) + .collect(); + + let mut schema_builder = schema::Schema::builder(); + let int_options = NumericOptions::default().set_fast().set_indexed(); + let int_field = schema_builder.add_u64_field("intvals", int_options); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + let index_doc = |index_writer: &mut IndexWriter, int_vals: &[u64]| { + let mut doc = TantivyDocument::default(); + for &val in int_vals { + doc.add_u64(int_field, val); + } + index_writer.add_document(doc).unwrap(); + }; + + for op in ops { + match op { + IndexingOp::ZeroVal => index_doc(&mut index_writer, &[]), + IndexingOp::OneVal { val } => index_doc(&mut index_writer, &[*val]), + IndexingOp::TwoVal { val } => index_doc(&mut index_writer, &[*val, *val]), + IndexingOp::Commit => { + index_writer.commit().expect("commit failed"); + } + } + } + index_writer.commit().expect("commit failed"); + } + { + let mut segment_ids = index.searchable_segment_ids()?; + segment_ids.sort(); + let mut index_writer: IndexWriter = index.writer_for_tests()?; + index_writer.merge(&segment_ids).wait()?; + index_writer.wait_merging_threads()?; + } + let reader = index.reader()?; + reader.reload()?; + + let mut vals: Vec = Vec::new(); + let mut test_vals = move |col: &Column, doc: DocId, expected: &[u64]| { + vals.clear(); + vals.extend(col.values_for_doc(doc)); + assert_eq!(&vals[..], expected); + }; + + let mut test_col = move |col: &Column, column_expected: &[(u32, Vec)]| { + for (doc_id, vals) in column_expected.iter() { + test_vals(col, *doc_id, vals); + } + }; + + { + let searcher = reader.searcher(); + let segment = searcher.segment_reader(0u32); + let col = segment + .fast_fields() + .column_opt::("intvals") + .unwrap() + .unwrap(); + + test_col(&col, &expected_doc_and_vals); + } + + Ok(()) + } + #[test] fn test_merge_multivalued_int_fields_simple() -> crate::Result<()> { let mut schema_builder = schema::Schema::builder();