add merge for bytes/str column

This commit is contained in:
Pascal Seitz
2023-02-01 12:13:25 +08:00
parent 96485f21d6
commit 3de018c49f
12 changed files with 428 additions and 35 deletions

View File

@@ -1,6 +1,5 @@
use std::io;
use std::ops::Deref;
use std::str::Bytes;
use std::sync::Arc;
use sstable::{Dictionary, VoidSSTable};
@@ -70,11 +69,17 @@ impl From<StrColumn> for BytesColumn {
}
impl StrColumn {
pub fn dictionary(&self) -> &Dictionary<VoidSSTable> {
self.0.dictionary.as_ref()
}
/// Fills the buffer
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
unsafe {
let buf = output.as_mut_vec();
self.0.dictionary.ord_to_term(term_ord, buf)?;
if !self.0.dictionary.ord_to_term(term_ord, buf)? {
return Ok(false);
}
// TODO consider remove checks if it hurts performance.
if std::str::from_utf8(buf.as_slice()).is_err() {
buf.clear();

View File

@@ -33,7 +33,7 @@ pub fn serialize_column_mappable_to_u64<T: MonotonicallyMappableToU64 + Debug>(
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
serialize_u64_based_column_values(
column_values,
|| column_values.boxed_iter(),
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
output,
)?;

View File

@@ -15,7 +15,7 @@ pub fn serialize_multivalued_index(
output: &mut impl Write,
) -> io::Result<()> {
crate::column_values::u64_based::serialize_u64_based_column_values(
&*multivalued_index,
|| multivalued_index.boxed_iter(),
&[CodecType::Bitpacked, CodecType::Linear],
output,
)?;

View File

@@ -113,8 +113,8 @@ pub mod tests {
#[test]
fn test_fastfield_bool_size_bitwidth_1() {
let mut buffer = Vec::new();
serialize_u64_based_column_values::<bool>(
&&[false, true][..],
serialize_u64_based_column_values(
|| [false, true].into_iter(),
&ALL_U64_CODEC_TYPES,
&mut buffer,
)
@@ -127,8 +127,12 @@ pub mod tests {
#[test]
fn test_fastfield_bool_bit_size_bitwidth_0() {
let mut buffer = Vec::new();
serialize_u64_based_column_values::<bool>(&&[true][..], &ALL_U64_CODEC_TYPES, &mut buffer)
.unwrap();
serialize_u64_based_column_values(
|| [false, true].into_iter(),
&ALL_U64_CODEC_TYPES,
&mut buffer,
)
.unwrap();
// 5 bytes of header, 0 bytes of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5);
}
@@ -137,8 +141,12 @@ pub mod tests {
fn test_fastfield_gcd() {
let mut buffer = Vec::new();
let vals: Vec<u64> = (0..80).map(|val| (val % 7) * 1_000u64).collect();
serialize_u64_based_column_values(&&vals[..], &[CodecType::Bitpacked], &mut buffer)
.unwrap();
serialize_u64_based_column_values(
|| vals.iter().cloned(),
&[CodecType::Bitpacked],
&mut buffer,
)
.unwrap();
// Values are stored over 3 bits.
assert_eq!(buffer.len(), 6 + (3 * 80 / 8));
}

View File

@@ -155,18 +155,22 @@ impl CodecType {
}
}
pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
vals: &dyn Iterable<T>,
pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64, F, I>(
vals: F,
codec_types: &[CodecType],
wrt: &mut dyn Write,
) -> io::Result<()> {
) -> io::Result<()>
where
I: Iterator<Item = T>,
F: Fn() -> I,
{
let mut stats_collector = StatsCollector::default();
let mut estimators: Vec<(CodecType, Box<dyn ColumnCodecEstimator>)> =
Vec::with_capacity(codec_types.len());
for &codec_type in codec_types {
estimators.push((codec_type, codec_type.estimator()));
}
for val in vals.boxed_iter() {
for val in vals() {
let val_u64 = val.to_u64();
stats_collector.collect(val_u64);
for (_, estimator) in &mut estimators {
@@ -190,7 +194,7 @@ pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
best_codec.to_code().serialize(wrt)?;
best_codec_estimator.serialize(
&stats,
&mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64),
&mut vals().map(MonotonicallyMappableToU64::to_u64),
wrt,
)?;
Ok(())
@@ -214,7 +218,7 @@ pub fn serialize_and_load_u64_based_column_values<T: MonotonicallyMappableToU64>
codec_types: &[CodecType],
) -> Arc<dyn ColumnValues<T>> {
let mut buffer = Vec::new();
serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap();
serialize_u64_based_column_values(|| vals.boxed_iter(), codec_types, &mut buffer).unwrap();
load_u64_based_column_values::<T>(OwnedBytes::new(buffer)).unwrap()
}

View File

@@ -7,7 +7,7 @@ fn test_serialize_and_load_simple() {
let mut buffer = Vec::new();
let vals = &[1u64, 2u64, 5u64];
serialize_u64_based_column_values(
&&vals[..],
|| vals.iter().cloned(),
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
&mut buffer,
)
@@ -243,7 +243,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
let mut vals: Vec<i64> = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::column_values::serialize_u64_based_column_values(
&&vals[..],
|| vals.iter().cloned(),
&[codec_type],
&mut buffer,
)?;
@@ -260,7 +260,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
vals.pop();
vals.push(1001i64);
crate::column_values::serialize_u64_based_column_values(
&&vals[..],
|| vals.iter().cloned(),
&[codec_type],
&mut buffer_without_gcd,
)?;
@@ -286,7 +286,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
let mut vals: Vec<u64> = (1..=num_vals).map(|i| i as u64 * 1000u64).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::column_values::serialize_u64_based_column_values(
&&vals[..],
|| vals.iter().cloned(),
&[codec_type],
&mut buffer,
)?;
@@ -303,7 +303,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
vals.pop();
vals.push(1001u64);
crate::column_values::serialize_u64_based_column_values(
&&vals[..],
|| vals.iter().cloned(),
&[codec_type],
&mut buffer_without_gcd,
)?;

View File

@@ -0,0 +1,114 @@
use std::io::{self, Write};
use common::CountingWriter;
use itertools::Itertools;
use sstable::{SSTable, TermOrdinal};
use super::term_merger::TermMerger;
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::{serialize_u64_based_column_values, CodecType};
use crate::BytesColumn;
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
// Column: [Column Index, Column Values, column index num bytes U32::LE]
pub fn merge_bytes_or_str_column(
column_index: SerializableColumnIndex<'_>,
bytes_columns: &[BytesColumn],
output: &mut impl Write,
) -> io::Result<()> {
// Serialize dict and generate mapping for values
let mut output = CountingWriter::wrap(output);
let term_ord_mapping = serialize_merged_dict(bytes_columns, &mut output)?;
let dictionary_num_bytes: u32 = output.written_bytes() as u32;
let output = output.finish();
serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?;
output.write_all(&dictionary_num_bytes.to_le_bytes())?;
Ok(())
}
fn serialize_bytes_or_str_column(
column_index: SerializableColumnIndex<'_>,
bytes_columns: &[BytesColumn],
term_ord_mapping: &TermOrdinalMapping,
output: &mut impl Write,
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
let column_values = move || {
let iter = bytes_columns
.iter()
.enumerate()
.flat_map(|(segment_ord, byte_column)| {
let segment_ord = term_ord_mapping.get_segment(segment_ord);
byte_column
.ords()
.values
.iter()
.map(move |term_ord| segment_ord[term_ord as usize])
});
iter
};
serialize_u64_based_column_values(
column_values,
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
output,
)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}
fn serialize_merged_dict(
bytes_columns: &[BytesColumn],
output: &mut impl Write,
) -> io::Result<TermOrdinalMapping> {
let mut term_ord_mapping = TermOrdinalMapping::default();
let mut field_term_streams = Vec::new();
for column in bytes_columns {
term_ord_mapping.add_segment(column.dictionary.num_terms());
let terms = column.dictionary.stream()?;
field_term_streams.push(terms);
}
let mut merged_terms = TermMerger::new(field_term_streams);
let mut sstable_builder = sstable::VoidSSTable::writer(output);
let mut current_term_ord = 0;
while merged_terms.advance() {
let term_bytes: &[u8] = merged_terms.key();
sstable_builder.insert(term_bytes, &())?;
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, current_term_ord);
}
current_term_ord += 1;
}
sstable_builder.finish()?;
Ok(term_ord_mapping)
}
#[derive(Default)]
struct TermOrdinalMapping {
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
}
impl TermOrdinalMapping {
fn add_segment(&mut self, max_term_ord: usize) {
self.per_segment_new_term_ordinals
.push(vec![TermOrdinal::default(); max_term_ord as usize]);
}
fn register_from_to(&mut self, segment_ord: usize, from_ord: TermOrdinal, to_ord: TermOrdinal) {
self.per_segment_new_term_ordinals[segment_ord][from_ord as usize] = to_ord;
}
fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] {
&(self.per_segment_new_term_ordinals[segment_ord])[..]
}
}

View File

@@ -1,4 +1,6 @@
mod merge_dict_column;
mod merge_mapping;
mod term_merger;
// mod sorted_doc_id_column;
@@ -12,6 +14,7 @@ pub use merge_mapping::{MergeRowOrder, StackMergeOrder};
use super::writer::ColumnarSerializer;
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
use crate::columnar::column_type::ColumnTypeCategory;
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
@@ -26,7 +29,6 @@ pub fn merge_columnar(
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(output);
// TODO handle dictionary merge for Str/Bytes column
let columns_to_merge = group_columns_for_merge(columnar_readers)?;
for ((column_name, column_type), columns) in columns_to_merge {
let mut column_serializer =
@@ -101,22 +103,24 @@ pub fn merge_column(
)?;
}
ColumnType::Bytes | ColumnType::Str => {
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
let mut column_indexes: Vec<Option<ColumnIndex>> = Vec::with_capacity(columns.len());
let mut bytes_columns: Vec<BytesColumn> = Vec::with_capacity(columns.len());
for dynamic_column_opt in columns {
match dynamic_column_opt {
Some(DynamicColumn::Str(str_column)) => {
bytes_columns.push(Some(str_column.into()));
column_indexes.push(Some(str_column.term_ord_column.idx.clone()));
bytes_columns.push(str_column.into());
}
Some(DynamicColumn::Bytes(bytes_column)) => {
bytes_columns.push(Some(bytes_column));
}
None => bytes_columns.push(None),
_ => {
panic!("This should never happen.");
column_indexes.push(Some(bytes_column.term_ord_column.idx.clone()));
bytes_columns.push(bytes_column);
}
_ => column_indexes.push(None),
}
}
todo!();
let merged_column_index =
crate::column_index::stack_column_index(&column_indexes[..], merge_row_order);
merge_bytes_or_str_column(merged_column_index, &bytes_columns, wrt)?;
}
}
Ok(())

View File

@@ -0,0 +1,107 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use sstable::TermOrdinal;
use crate::Streamer;
pub struct HeapItem<'a> {
pub streamer: Streamer<'a>,
pub segment_ord: usize,
}
impl<'a> PartialEq for HeapItem<'a> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a> Eq for HeapItem<'a> {}
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
}
impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> {
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
}
}
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}
fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
true
} else {
false
}
}
/// Returns the current term.
///
/// This method may be called
/// if and only if advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
}
}

View File

@@ -73,7 +73,9 @@ fn test_missing_column() {
}
}
fn make_columnar_multiple_columns(columns: &[(&str, &[&[NumericalValue]])]) -> ColumnarReader {
fn make_numerical_columnar_multiple_columns(
columns: &[(&str, &[&[NumericalValue]])],
) -> ColumnarReader {
let mut dataframe_writer = ColumnarWriter::default();
for (column_name, column_values) in columns {
for (row_id, vals) in column_values.iter().enumerate() {
@@ -92,12 +94,52 @@ fn make_columnar_multiple_columns(columns: &[(&str, &[&[NumericalValue]])]) -> C
ColumnarReader::open(buffer).unwrap()
}
fn make_byte_columnar_multiple_columns(columns: &[(&str, &[&[&[u8]]])]) -> ColumnarReader {
let mut dataframe_writer = ColumnarWriter::default();
for (column_name, column_values) in columns {
for (row_id, vals) in column_values.iter().enumerate() {
for val in vals.iter() {
dataframe_writer.record_bytes(row_id as u32, column_name, *val);
}
}
}
let num_rows = columns
.iter()
.map(|(_, val_rows)| val_rows.len() as RowId)
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}
fn make_text_columnar_multiple_columns(columns: &[(&str, &[&[&str]])]) -> ColumnarReader {
let mut dataframe_writer = ColumnarWriter::default();
for (column_name, column_values) in columns {
for (row_id, vals) in column_values.iter().enumerate() {
for val in vals.iter() {
dataframe_writer.record_str(row_id as u32, column_name, *val);
}
}
}
let num_rows = columns
.iter()
.map(|(_, val_rows)| val_rows.len() as RowId)
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}
#[test]
fn test_merge_columnar() {
fn test_merge_columnar_numbers() {
let columnar1 =
make_columnar_multiple_columns(&[("numbers", &[&[NumericalValue::from(-1f64)]])]);
let columnar2 =
make_columnar_multiple_columns(&[("numbers", &[&[], &[NumericalValue::from(-3f64)]])]);
make_numerical_columnar_multiple_columns(&[("numbers", &[&[NumericalValue::from(-1f64)]])]);
let columnar2 = make_numerical_columnar_multiple_columns(&[(
"numbers",
&[&[], &[NumericalValue::from(-3f64)]],
)]);
let mut buffer = Vec::new();
let columnars = &[columnar1, columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
@@ -118,3 +160,93 @@ fn test_merge_columnar() {
assert_eq!(vals.first(1u32), None);
assert_eq!(vals.first(2u32), Some(-3f64));
}
#[test]
fn test_merge_columnar_texts() {
let columnar1 = make_text_columnar_multiple_columns(&[("texts", &[&["a"]])]);
let columnar2 = make_text_columnar_multiple_columns(&[("texts", &[&[], &["b"]])]);
let mut buffer = Vec::new();
let columnars = &[columnar1, columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
crate::columnar::merge_columnar(
columnars,
MergeRowOrder::Stack(stack_merge_order),
&mut buffer,
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 3);
assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("texts").unwrap();
let dynamic_column = cols[0].open().unwrap();
let DynamicColumn::Str(vals) = dynamic_column else { panic!() };
let get_str_for_ord = |ord| {
let mut out = String::new();
vals.ord_to_str(ord, &mut out).unwrap();
out
};
assert_eq!(vals.dictionary.num_terms(), 2);
assert_eq!(get_str_for_ord(0), "a");
assert_eq!(get_str_for_ord(1), "b");
let get_str_for_row = |row_id| {
let term_ords: Vec<u64> = vals.term_ords(row_id).collect();
assert!(term_ords.len() <= 1);
let mut out = String::new();
if term_ords.len() == 1 {
vals.ord_to_str(term_ords[0], &mut out).unwrap();
}
out
};
assert_eq!(get_str_for_row(0), "a");
assert_eq!(get_str_for_row(1), "");
assert_eq!(get_str_for_row(2), "b");
}
#[test]
fn test_merge_columnar_byte() {
let columnar1 = make_byte_columnar_multiple_columns(&[("bytes", &[&[b"bbbb"], &[b"baaa"]])]);
let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]);
let mut buffer = Vec::new();
let columnars = &[columnar1, columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
crate::columnar::merge_columnar(
columnars,
MergeRowOrder::Stack(stack_merge_order),
&mut buffer,
)
.unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_rows(), 4);
assert_eq!(columnar_reader.num_columns(), 1);
let cols = columnar_reader.read_columns("bytes").unwrap();
let dynamic_column = cols[0].open().unwrap();
let DynamicColumn::Bytes(vals) = dynamic_column else { panic!() };
let get_bytes_for_ord = |ord| {
let mut out = Vec::new();
vals.ord_to_bytes(ord, &mut out).unwrap();
out
};
assert_eq!(vals.dictionary.num_terms(), 3);
assert_eq!(get_bytes_for_ord(0), b"a");
assert_eq!(get_bytes_for_ord(1), b"baaa");
assert_eq!(get_bytes_for_ord(2), b"bbbb");
let get_bytes_for_row = |row_id| {
let term_ords: Vec<u64> = vals.term_ords(row_id).collect();
assert!(term_ords.len() <= 1);
let mut out = Vec::new();
if term_ords.len() == 1 {
vals.ord_to_bytes(term_ords[0], &mut out).unwrap();
}
out
};
assert_eq!(get_bytes_for_row(0), b"bbbb");
assert_eq!(get_bytes_for_row(1), b"baaa");
assert_eq!(get_bytes_for_row(2), b"");
assert_eq!(get_bytes_for_row(3), b"a");
}

View File

@@ -401,6 +401,8 @@ impl ColumnarWriter {
}
}
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
// Column: [Column Index, Column Values, column index num bytes U32::LE]
fn serialize_bytes_or_str_column(
cardinality: Cardinality,
num_docs: RowId,

View File

@@ -27,6 +27,23 @@ impl<U> Iterable<U> for &dyn Iterable<U> {
}
}
impl<F, T> Iterable<T> for F
where F: Fn() -> Box<dyn Iterator<Item = T>>
{
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
self()
}
}
// impl<F, I, T> Iterable<T> for F
// where
// I: Iterator<Item = T>,
// F: Fn() -> I,
//{
// fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
// Box::new(self())
//}
pub fn map_iterable<U, V>(
original_iterable: impl Iterable<U>,
transform: impl Fn(U) -> V,