merge column: small refactors (#2579)

* merge column: small refactors

* make ord dependency more explicit

* add columnar merge crashtest proptest

* fix naming
This commit is contained in:
PSeitz
2025-03-07 11:52:34 +01:00
committed by GitHub
parent 80f5f1ecd4
commit d5d2d41264
10 changed files with 218 additions and 118 deletions

View File

@@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>(
ColumnIndex::Full => Box::new(doc_range), ColumnIndex::Full => Box::new(doc_range),
ColumnIndex::Optional(optional_index) => Box::new( ColumnIndex::Optional(optional_index) => Box::new(
optional_index optional_index
.iter_rows() .iter_docs()
.map(move |row| row + doc_range.start), .map(move |row| row + doc_range.start),
), ),
ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { ColumnIndex::Multivalued(multivalued_index) => match multivalued_index {
@@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>(
MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new( MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new(
multivalued_index multivalued_index
.optional_index .optional_index
.iter_rows() .iter_docs()
.map(move |row| row + doc_range.start), .map(move |row| row + doc_range.start),
), ),
}, },
@@ -177,7 +177,7 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
ColumnIndex::Full => Box::new(columnar_row_range), ColumnIndex::Full => Box::new(columnar_row_range),
ColumnIndex::Optional(optional_index) => Box::new( ColumnIndex::Optional(optional_index) => Box::new(
optional_index optional_index
.iter_rows() .iter_docs()
.map(move |row_id: RowId| columnar_row_range.start + row_id), .map(move |row_id: RowId| columnar_row_range.start + row_id),
), ),
ColumnIndex::Multivalued(_) => { ColumnIndex::Multivalued(_) => {

View File

@@ -80,23 +80,23 @@ impl BlockVariant {
/// index is the block index. For each block `byte_start` and `offset` is computed. /// index is the block index. For each block `byte_start` and `offset` is computed.
#[derive(Clone)] #[derive(Clone)]
pub struct OptionalIndex { pub struct OptionalIndex {
num_rows: RowId, num_docs: RowId,
num_non_null_rows: RowId, num_non_null_docs: RowId,
block_data: OwnedBytes, block_data: OwnedBytes,
block_metas: Arc<[BlockMeta]>, block_metas: Arc<[BlockMeta]>,
} }
impl Iterable<u32> for &OptionalIndex { impl Iterable<u32> for &OptionalIndex {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> { fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.iter_rows()) Box::new(self.iter_docs())
} }
} }
impl std::fmt::Debug for OptionalIndex { impl std::fmt::Debug for OptionalIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("OptionalIndex") f.debug_struct("OptionalIndex")
.field("num_rows", &self.num_rows) .field("num_docs", &self.num_docs)
.field("num_non_null_rows", &self.num_non_null_rows) .field("num_non_null_docs", &self.num_non_null_docs)
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
@@ -271,17 +271,17 @@ impl OptionalIndex {
} }
pub fn num_docs(&self) -> RowId { pub fn num_docs(&self) -> RowId {
self.num_rows self.num_docs
} }
pub fn num_non_nulls(&self) -> RowId { pub fn num_non_nulls(&self) -> RowId {
self.num_non_null_rows self.num_non_null_docs
} }
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ { pub fn iter_docs(&self) -> impl Iterator<Item = RowId> + '_ {
// TODO optimize // TODO optimize
let mut select_batch = self.select_cursor(); let mut select_batch = self.select_cursor();
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank)) (0..self.num_non_null_docs).map(move |rank| select_batch.select(rank))
} }
pub fn select_batch(&self, ranks: &mut [RowId]) { pub fn select_batch(&self, ranks: &mut [RowId]) {
let mut select_cursor = self.select_cursor(); let mut select_cursor = self.select_cursor();
@@ -519,15 +519,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2); let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
let num_non_empty_block_bytes = let num_non_empty_block_bytes =
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap()); u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32; let num_docs = VInt::deserialize_u64(&mut bytes)? as u32;
let block_metas_num_bytes = let block_metas_num_bytes =
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES; num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes); let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
let (block_metas, num_non_null_rows) = let (block_metas, num_non_null_docs) =
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows); deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs);
let optional_index = OptionalIndex { let optional_index = OptionalIndex {
num_rows, num_docs,
num_non_null_rows, num_non_null_docs,
block_data, block_data,
block_metas: block_metas.into(), block_metas: block_metas.into(),
}; };

View File

@@ -164,7 +164,7 @@ fn test_optional_index_large() {
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) { fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
let optional_index = OptionalIndex::for_test(num_rows, row_ids); let optional_index = OptionalIndex::for_test(num_rows, row_ids);
assert_eq!(optional_index.num_docs(), num_rows); assert_eq!(optional_index.num_docs(), num_rows);
assert!(optional_index.iter_rows().eq(row_ids.iter().copied())); assert!(optional_index.iter_docs().eq(row_ids.iter().copied()));
} }
#[test] #[test]

View File

@@ -3,7 +3,7 @@ use std::io::{self, Write};
use common::{BitSet, CountingWriter, ReadOnlyBitSet}; use common::{BitSet, CountingWriter, ReadOnlyBitSet};
use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable}; use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable};
use super::term_merger::TermMerger; use super::term_merger::{TermMerger, TermsWithSegmentOrd};
use crate::column::serialize_column_mappable_to_u64; use crate::column::serialize_column_mappable_to_u64;
use crate::column_index::SerializableColumnIndex; use crate::column_index::SerializableColumnIndex;
use crate::iterable::Iterable; use crate::iterable::Iterable;
@@ -126,14 +126,17 @@ fn serialize_merged_dict(
let mut term_ord_mapping = TermOrdinalMapping::default(); let mut term_ord_mapping = TermOrdinalMapping::default();
let mut field_term_streams = Vec::new(); let mut field_term_streams = Vec::new();
for column_opt in bytes_columns.iter() { for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
if let Some(column) = column_opt { if let Some(column) = column_opt {
term_ord_mapping.add_segment(column.dictionary.num_terms()); term_ord_mapping.add_segment(column.dictionary.num_terms());
let terms: Streamer<VoidSSTable> = column.dictionary.stream()?; let terms: Streamer<VoidSSTable> = column.dictionary.stream()?;
field_term_streams.push(terms); field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord });
} else { } else {
term_ord_mapping.add_segment(0); term_ord_mapping.add_segment(0);
field_term_streams.push(Streamer::empty()); field_term_streams.push(TermsWithSegmentOrd {
terms: Streamer::empty(),
segment_ord,
});
} }
} }
@@ -191,6 +194,7 @@ fn serialize_merged_dict(
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct TermOrdinalMapping { struct TermOrdinalMapping {
/// Contains the new term ordinals for each segment.
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>, per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
} }
@@ -205,6 +209,6 @@ impl TermOrdinalMapping {
} }
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] { fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
&(self.per_segment_new_term_ordinals[segment_ord as usize])[..] &self.per_segment_new_term_ordinals[segment_ord as usize]
} }
} }

View File

@@ -26,7 +26,7 @@ impl StackMergeOrder {
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len()); let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
let mut cumulated_row_id = 0; let mut cumulated_row_id = 0;
for columnar in columnars { for columnar in columnars {
cumulated_row_id += columnar.num_rows(); cumulated_row_id += columnar.num_docs();
cumulated_row_ids.push(cumulated_row_id); cumulated_row_ids.push(cumulated_row_id);
} }
StackMergeOrder { cumulated_row_ids } StackMergeOrder { cumulated_row_ids }

View File

@@ -80,13 +80,12 @@ pub fn merge_columnar(
output: &mut impl io::Write, output: &mut impl io::Write,
) -> io::Result<()> { ) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output); let mut serializer = ColumnarSerializer::new(output);
let num_rows_per_columnar = columnar_readers let num_docs_per_columnar = columnar_readers
.iter() .iter()
.map(|reader| reader.num_rows()) .map(|reader| reader.num_docs())
.collect::<Vec<u32>>(); .collect::<Vec<u32>>();
let columns_to_merge = let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
for res in columns_to_merge { for res in columns_to_merge {
let ((column_name, _column_type_category), grouped_columns) = res; let ((column_name, _column_type_category), grouped_columns) = res;
let grouped_columns = grouped_columns.open(&merge_row_order)?; let grouped_columns = grouped_columns.open(&merge_row_order)?;
@@ -94,15 +93,18 @@ pub fn merge_columnar(
continue; continue;
} }
let column_type = grouped_columns.column_type_after_merge(); let column_type_after_merge = grouped_columns.column_type_after_merge();
let mut columns = grouped_columns.columns; let mut columns = grouped_columns.columns;
coerce_columns(column_type, &mut columns)?; // Make sure the number of columns is the same as the number of columnar readers.
// Or num_docs_per_columnar would be incorrect.
assert_eq!(columns.len(), columnar_readers.len());
coerce_columns(column_type_after_merge, &mut columns)?;
let mut column_serializer = let mut column_serializer =
serializer.start_serialize_column(column_name.as_bytes(), column_type); serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge);
merge_column( merge_column(
column_type, column_type_after_merge,
&num_rows_per_columnar, &num_docs_per_columnar,
columns, columns,
&merge_row_order, &merge_row_order,
&mut column_serializer, &mut column_serializer,
@@ -128,7 +130,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
fn merge_column( fn merge_column(
column_type: ColumnType, column_type: ColumnType,
num_docs_per_column: &[u32], num_docs_per_column: &[u32],
columns: Vec<Option<DynamicColumn>>, columns_to_merge: Vec<Option<DynamicColumn>>,
merge_row_order: &MergeRowOrder, merge_row_order: &MergeRowOrder,
wrt: &mut impl io::Write, wrt: &mut impl io::Write,
) -> io::Result<()> { ) -> io::Result<()> {
@@ -138,10 +140,10 @@ fn merge_column(
| ColumnType::F64 | ColumnType::F64
| ColumnType::DateTime | ColumnType::DateTime
| ColumnType::Bool => { | ColumnType::Bool => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len()); let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> = let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
Vec::with_capacity(columns.len()); Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() { for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
if let Some(Column { index: idx, values }) = if let Some(Column { index: idx, values }) =
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
{ {
@@ -164,10 +166,10 @@ fn merge_column(
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?; serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
} }
ColumnType::IpAddr => { ColumnType::IpAddr => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len()); let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> = let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
Vec::with_capacity(columns.len()); Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns.into_iter().enumerate() { for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) = if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
dynamic_column_opt dynamic_column_opt
{ {
@@ -192,9 +194,10 @@ fn merge_column(
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?; serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
} }
ColumnType::Bytes | ColumnType::Str => { ColumnType::Bytes | ColumnType::Str => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len()); let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len()); let mut bytes_columns: Vec<Option<BytesColumn>> =
for (i, dynamic_column_opt) in columns.into_iter().enumerate() { Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
match dynamic_column_opt { match dynamic_column_opt {
Some(DynamicColumn::Str(str_column)) => { Some(DynamicColumn::Str(str_column)) => {
column_indexes.push(str_column.term_ord_column.index.clone()); column_indexes.push(str_column.term_ord_column.index.clone());
@@ -248,7 +251,7 @@ impl GroupedColumns {
if column_type.len() == 1 { if column_type.len() == 1 {
return column_type.into_iter().next().unwrap(); return column_type.into_iter().next().unwrap();
} }
// At the moment, only the numerical categorical column type has more than one possible // At the moment, only the numerical column type category has more than one possible
// column type. // column type.
assert!(self assert!(self
.columns .columns
@@ -361,7 +364,7 @@ fn is_empty_after_merge(
ColumnIndex::Empty { .. } => true, ColumnIndex::Empty { .. } => true,
ColumnIndex::Full => alive_bitset.len() == 0, ColumnIndex::Full => alive_bitset.len() == 0,
ColumnIndex::Optional(optional_index) => { ColumnIndex::Optional(optional_index) => {
for doc in optional_index.iter_rows() { for doc in optional_index.iter_docs() {
if alive_bitset.contains(doc) { if alive_bitset.contains(doc) {
return false; return false;
} }
@@ -391,7 +394,6 @@ fn is_empty_after_merge(
fn group_columns_for_merge<'a>( fn group_columns_for_merge<'a>(
columnar_readers: &'a [&'a ColumnarReader], columnar_readers: &'a [&'a ColumnarReader],
required_columns: &'a [(String, ColumnType)], required_columns: &'a [(String, ColumnType)],
_merge_row_order: &'a MergeRowOrder,
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> { ) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new(); let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();

View File

@@ -5,28 +5,29 @@ use sstable::TermOrdinal;
use crate::Streamer; use crate::Streamer;
pub struct HeapItem<'a> { /// The terms of a column with the ordinal of the segment.
pub streamer: Streamer<'a>, pub struct TermsWithSegmentOrd<'a> {
pub terms: Streamer<'a>,
pub segment_ord: usize, pub segment_ord: usize,
} }
impl PartialEq for HeapItem<'_> { impl PartialEq for TermsWithSegmentOrd<'_> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord self.segment_ord == other.segment_ord
} }
} }
impl Eq for HeapItem<'_> {} impl Eq for TermsWithSegmentOrd<'_> {}
impl<'a> PartialOrd for HeapItem<'a> { impl<'a> PartialOrd for TermsWithSegmentOrd<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> { fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option<Ordering> {
Some(self.cmp(other)) Some(self.cmp(other))
} }
} }
impl<'a> Ord for HeapItem<'a> { impl<'a> Ord for TermsWithSegmentOrd<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering { fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) (&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord))
} }
} }
@@ -37,39 +38,32 @@ impl<'a> Ord for HeapItem<'a> {
/// - the term /// - the term
/// - a slice with the ordinal of the segments containing the terms. /// - a slice with the ordinal of the segments containing the terms.
pub struct TermMerger<'a> { pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>, heap: BinaryHeap<TermsWithSegmentOrd<'a>>,
current_streamers: Vec<HeapItem<'a>>, term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>,
} }
impl<'a> TermMerger<'a> { impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary /// Stream of merged term dictionary
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> { pub fn new(term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>) -> TermMerger<'a> {
TermMerger { TermMerger {
heap: BinaryHeap::new(), heap: BinaryHeap::new(),
current_streamers: streams term_streams_with_segment,
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
} }
} }
pub(crate) fn matching_segments<'b: 'a>( pub(crate) fn matching_segments<'b: 'a>(
&'b self, &'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> { ) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers self.term_streams_with_segment
.iter() .iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())) .map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord()))
} }
fn advance_segments(&mut self) { fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers; let streamers = &mut self.term_streams_with_segment;
let heap = &mut self.heap; let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) { for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() { if heap_item.terms.advance() {
heap.push(heap_item); heap.push(heap_item);
} }
} }
@@ -81,13 +75,13 @@ impl<'a> TermMerger<'a> {
pub fn advance(&mut self) -> bool { pub fn advance(&mut self) -> bool {
self.advance_segments(); self.advance_segments();
if let Some(head) = self.heap.pop() { if let Some(head) = self.heap.pop() {
self.current_streamers.push(head); self.term_streams_with_segment.push(head);
while let Some(next_streamer) = self.heap.peek() { while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() {
break; break;
} }
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it); self.term_streams_with_segment.push(next_heap_it);
} }
true true
} else { } else {
@@ -101,6 +95,6 @@ impl<'a> TermMerger<'a> {
/// if and only if advance() has been called before /// if and only if advance() has been called before
/// and "true" was returned. /// and "true" was returned.
pub fn key(&self) -> &[u8] { pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key() self.term_streams_with_segment[0].terms.key()
} }
} }

View File

@@ -1,7 +1,10 @@
use itertools::Itertools; use itertools::Itertools;
use proptest::collection::vec;
use proptest::prelude::*;
use super::*; use super::*;
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; use crate::columnar::{merge_columnar, ColumnarReader, MergeRowOrder, StackMergeOrder};
use crate::{Cardinality, ColumnarWriter, DynamicColumn, HasAssociatedColumnType, RowId};
fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>( fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
column_name: &str, column_name: &str,
@@ -26,9 +29,8 @@ fn test_column_coercion_to_u64() {
// u64 type // u64 type
let columnar2 = make_columnar("numbers", &[u64::MAX]); let columnar2 = make_columnar("numbers", &[u64::MAX]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap(); group_columns_for_merge(columnars, &[]).unwrap();
assert_eq!(column_map.len(), 1); assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
} }
@@ -38,9 +40,8 @@ fn test_column_coercion_to_i64() {
let columnar1 = make_columnar("numbers", &[-1i64]); let columnar1 = make_columnar("numbers", &[-1i64]);
let columnar2 = make_columnar("numbers", &[2u64]); let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap(); group_columns_for_merge(columnars, &[]).unwrap();
assert_eq!(column_map.len(), 1); assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
} }
@@ -63,14 +64,8 @@ fn test_group_columns_with_required_column() {
let columnar1 = make_columnar("numbers", &[1i64]); let columnar1 = make_columnar("numbers", &[1i64]);
let columnar2 = make_columnar("numbers", &[2u64]); let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge( group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
&[&columnar1, &columnar2],
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 1); assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
} }
@@ -80,13 +75,9 @@ fn test_group_columns_required_column_with_no_existing_columns() {
let columnar1 = make_columnar("numbers", &[2u64]); let columnar1 = make_columnar("numbers", &[2u64]);
let columnar2 = make_columnar("numbers", &[2u64]); let columnar2 = make_columnar("numbers", &[2u64]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<_, _> =
let column_map: BTreeMap<_, _> = group_columns_for_merge( group_columns_for_merge(columnars, &[("required_col".to_string(), ColumnType::Str)])
columnars, .unwrap();
&[("required_col".to_string(), ColumnType::Str)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 2); assert_eq!(column_map.len(), 2);
let columns = &column_map let columns = &column_map
.get(&("required_col".to_string(), ColumnTypeCategory::Str)) .get(&("required_col".to_string(), ColumnTypeCategory::Str))
@@ -102,14 +93,8 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru
let columnar1 = make_columnar("numbers", &[2i64]); let columnar1 = make_columnar("numbers", &[2i64]);
let columnar2 = make_columnar("numbers", &[2i64]); let columnar2 = make_columnar("numbers", &[2i64]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge( group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
columnars,
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
assert_eq!(column_map.len(), 1); assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
} }
@@ -119,9 +104,8 @@ fn test_missing_column() {
let columnar1 = make_columnar("numbers", &[-1i64]); let columnar1 = make_columnar("numbers", &[-1i64]);
let columnar2 = make_columnar("numbers2", &[2u64]); let columnar2 = make_columnar("numbers2", &[2u64]);
let columnars = &[&columnar1, &columnar2]; let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap(); group_columns_for_merge(columnars, &[]).unwrap();
assert_eq!(column_map.len(), 2); assert_eq!(column_map.len(), 2);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
{ {
@@ -224,7 +208,7 @@ fn test_merge_columnar_numbers() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 3); assert_eq!(columnar_reader.num_docs(), 3);
assert_eq!(columnar_reader.num_columns(), 1); assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("numbers").unwrap(); let cols = columnar_reader.read_columns("numbers").unwrap();
let dynamic_column = cols[0].open().unwrap(); let dynamic_column = cols[0].open().unwrap();
@@ -252,7 +236,7 @@ fn test_merge_columnar_texts() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 3); assert_eq!(columnar_reader.num_docs(), 3);
assert_eq!(columnar_reader.num_columns(), 1); assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("texts").unwrap(); let cols = columnar_reader.read_columns("texts").unwrap();
let dynamic_column = cols[0].open().unwrap(); let dynamic_column = cols[0].open().unwrap();
@@ -301,7 +285,7 @@ fn test_merge_columnar_byte() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 4); assert_eq!(columnar_reader.num_docs(), 4);
assert_eq!(columnar_reader.num_columns(), 1); assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("bytes").unwrap(); let cols = columnar_reader.read_columns("bytes").unwrap();
let dynamic_column = cols[0].open().unwrap(); let dynamic_column = cols[0].open().unwrap();
@@ -357,7 +341,7 @@ fn test_merge_columnar_byte_with_missing() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3); assert_eq!(columnar_reader.num_docs(), 3 + 2 + 3);
assert_eq!(columnar_reader.num_columns(), 2); assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("col").unwrap(); let cols = columnar_reader.read_columns("col").unwrap();
let dynamic_column = cols[0].open().unwrap(); let dynamic_column = cols[0].open().unwrap();
@@ -409,7 +393,7 @@ fn test_merge_columnar_different_types() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 4); assert_eq!(columnar_reader.num_docs(), 4);
assert_eq!(columnar_reader.num_columns(), 2); assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap(); let cols = columnar_reader.read_columns("mixed").unwrap();
@@ -474,7 +458,7 @@ fn test_merge_columnar_different_empty_cardinality() {
) )
.unwrap(); .unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 2); assert_eq!(columnar_reader.num_docs(), 2);
assert_eq!(columnar_reader.num_columns(), 2); assert_eq!(columnar_reader.num_columns(), 2);
let cols = columnar_reader.read_columns("mixed").unwrap(); let cols = columnar_reader.read_columns("mixed").unwrap();
@@ -486,3 +470,119 @@ fn test_merge_columnar_different_empty_cardinality() {
let dynamic_column = cols[1].open().unwrap(); let dynamic_column = cols[1].open().unwrap();
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional); assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
} }
#[derive(Debug, Clone)]
struct ColumnSpec {
column_name: String,
/// (row_id, term)
terms: Vec<(RowId, Vec<u8>)>,
}
#[derive(Clone, Debug)]
struct ColumnarSpec {
columns: Vec<ColumnSpec>,
}
/// Generate a random (row_id, term) pair:
/// - row_id in [0..10]
/// - term is either from POSSIBLE_TERMS or random bytes
fn rowid_and_term_strategy() -> impl Strategy<Value = (RowId, Vec<u8>)> {
const POSSIBLE_TERMS: &[&[u8]] = &[b"a", b"b", b"allo"];
let term_strat = prop_oneof![
// pick from the fixed list
(0..POSSIBLE_TERMS.len()).prop_map(|i| POSSIBLE_TERMS[i].to_vec()),
// or random bytes (length 0..10)
prop::collection::vec(any::<u8>(), 0..10),
];
(0u32..11, term_strat)
}
/// Generate one ColumnSpec, with a random name and a random list of (row_id, term).
/// We sort it by row_id so that data is in ascending order.
fn column_spec_strategy() -> impl Strategy<Value = ColumnSpec> {
let column_name = prop_oneof![
Just("col".to_string()),
Just("col2".to_string()),
"col.*".prop_map(|s| s),
];
// We'll produce 0..8 (rowid,term) entries for this column
let data_strat = vec(rowid_and_term_strategy(), 0..8).prop_map(|mut pairs| {
// Sort by row_id
pairs.sort_by_key(|(row_id, _)| *row_id);
pairs
});
(column_name, data_strat).prop_map(|(name, data)| ColumnSpec {
column_name: name,
terms: data,
})
}
/// Strategy to generate an ColumnarSpec
fn columnar_strategy() -> impl Strategy<Value = ColumnarSpec> {
vec(column_spec_strategy(), 0..3).prop_map(|columns| ColumnarSpec { columns })
}
/// Strategy to generate multiple ColumnarSpecs, each of which we will treat
/// as one "columnar" to be merged together.
fn columnars_strategy() -> impl Strategy<Value = Vec<ColumnarSpec>> {
vec(columnar_strategy(), 1..4)
}
/// Build a `ColumnarReader` from a `ColumnarSpec`
fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader {
let mut writer = ColumnarWriter::default();
let mut max_row_id = 0;
for col in &spec.columns {
for &(row_id, ref term) in &col.terms {
writer.record_bytes(row_id, &col.column_name, term);
max_row_id = max_row_id.max(row_id);
}
}
let mut buffer = Vec::new();
writer.serialize(max_row_id + 1, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}
proptest! {
// We just test that the merge_columnar function doesn't crash.
#![proptest_config(ProptestConfig::with_cases(256))]
#[test]
fn test_merge_columnar_bytes_no_crash(columnars in columnars_strategy(), second_merge_columnars in columnars_strategy()) {
let columnars: Vec<ColumnarReader> = columnars.iter()
.map(build_columnar)
.collect();
let mut out = Vec::new();
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
merge_columnar(
&columnar_refs,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut out,
).unwrap();
let merged_reader = ColumnarReader::open(out).unwrap();
// Merge the second set of columnars with the result of the first merge
let mut columnars: Vec<ColumnarReader> = second_merge_columnars.iter()
.map(build_columnar)
.collect();
columnars.push(merged_reader);
let mut out = Vec::new();
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
merge_columnar(
&columnar_refs,
&[],
MergeRowOrder::Stack(stack_merge_order),
&mut out,
).unwrap();
}
}

View File

@@ -19,13 +19,13 @@ fn io_invalid_data(msg: String) -> io::Error {
pub struct ColumnarReader { pub struct ColumnarReader {
column_dictionary: Dictionary<RangeSSTable>, column_dictionary: Dictionary<RangeSSTable>,
column_data: FileSlice, column_data: FileSlice,
num_rows: RowId, num_docs: RowId,
format_version: Version, format_version: Version,
} }
impl fmt::Debug for ColumnarReader { impl fmt::Debug for ColumnarReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let num_rows = self.num_rows(); let num_rows = self.num_docs();
let columns = self.list_columns().unwrap(); let columns = self.list_columns().unwrap();
let num_cols = columns.len(); let num_cols = columns.len();
let mut debug_struct = f.debug_struct("Columnar"); let mut debug_struct = f.debug_struct("Columnar");
@@ -112,13 +112,13 @@ impl ColumnarReader {
Ok(ColumnarReader { Ok(ColumnarReader {
column_dictionary, column_dictionary,
column_data, column_data,
num_rows, num_docs: num_rows,
format_version, format_version,
}) })
} }
pub fn num_rows(&self) -> RowId { pub fn num_docs(&self) -> RowId {
self.num_rows self.num_docs
} }
// Iterate over the columns in a sorted way // Iterate over the columns in a sorted way
pub fn iter_columns( pub fn iter_columns(

View File

@@ -380,7 +380,7 @@ fn assert_columnar_eq(
right: &ColumnarReader, right: &ColumnarReader,
lenient_on_numerical_value: bool, lenient_on_numerical_value: bool,
) { ) {
assert_eq!(left.num_rows(), right.num_rows()); assert_eq!(left.num_docs(), right.num_docs());
let left_columns = left.list_columns().unwrap(); let left_columns = left.list_columns().unwrap();
let right_columns = right.list_columns().unwrap(); let right_columns = right.list_columns().unwrap();
assert_eq!(left_columns.len(), right_columns.len()); assert_eq!(left_columns.len(), right_columns.len());
@@ -588,7 +588,7 @@ proptest! {
#[test] #[test]
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) { fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
let columnar = build_columnar(&docs[..]); let columnar = build_columnar(&docs[..]);
assert_eq!(columnar.num_rows() as usize, docs.len()); assert_eq!(columnar.num_docs() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default(); let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() { for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals { for (col_name, col_val) in doc_vals {
@@ -820,7 +820,7 @@ fn test_columnar_merge_empty() {
) )
.unwrap(); .unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_rows(), 0); assert_eq!(merged_columnar.num_docs(), 0);
assert_eq!(merged_columnar.num_columns(), 0); assert_eq!(merged_columnar.num_columns(), 0);
} }
@@ -846,7 +846,7 @@ fn test_columnar_merge_single_str_column() {
) )
.unwrap(); .unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_rows(), 1); assert_eq!(merged_columnar.num_docs(), 1);
assert_eq!(merged_columnar.num_columns(), 1); assert_eq!(merged_columnar.num_columns(), 1);
} }
@@ -878,7 +878,7 @@ fn test_delete_decrease_cardinality() {
) )
.unwrap(); .unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_rows(), 1); assert_eq!(merged_columnar.num_docs(), 1);
assert_eq!(merged_columnar.num_columns(), 1); assert_eq!(merged_columnar.num_columns(), 1);
let cols = merged_columnar.read_columns("c").unwrap(); let cols = merged_columnar.read_columns("c").unwrap();
assert_eq!(cols.len(), 1); assert_eq!(cols.len(), 1);