From 3de018c49f6cd85d965a60ab635089fe7da877ca Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 1 Feb 2023 12:13:25 +0800 Subject: [PATCH] add merge for bytes/str column --- columnar/src/column/dictionary_encoded.rs | 9 +- columnar/src/column/serialize.rs | 2 +- .../src/column_index/multivalued_index.rs | 2 +- columnar/src/column_values/serialize.rs | 20 ++- columnar/src/column_values/u64_based/mod.rs | 16 +- columnar/src/column_values/u64_based/tests.rs | 10 +- .../src/columnar/merge/merge_dict_column.rs | 114 ++++++++++++++ columnar/src/columnar/merge/mod.rs | 22 +-- columnar/src/columnar/merge/term_merger.rs | 107 +++++++++++++ columnar/src/columnar/merge/tests.rs | 142 +++++++++++++++++- columnar/src/columnar/writer/mod.rs | 2 + columnar/src/iterable.rs | 17 +++ 12 files changed, 428 insertions(+), 35 deletions(-) create mode 100644 columnar/src/columnar/merge/merge_dict_column.rs create mode 100644 columnar/src/columnar/merge/term_merger.rs diff --git a/columnar/src/column/dictionary_encoded.rs b/columnar/src/column/dictionary_encoded.rs index 305d497af..c10828658 100644 --- a/columnar/src/column/dictionary_encoded.rs +++ b/columnar/src/column/dictionary_encoded.rs @@ -1,6 +1,5 @@ use std::io; use std::ops::Deref; -use std::str::Bytes; use std::sync::Arc; use sstable::{Dictionary, VoidSSTable}; @@ -70,11 +69,17 @@ impl From for BytesColumn { } impl StrColumn { + pub fn dictionary(&self) -> &Dictionary { + self.0.dictionary.as_ref() + } + /// Fills the buffer pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result { unsafe { let buf = output.as_mut_vec(); - self.0.dictionary.ord_to_term(term_ord, buf)?; + if !self.0.dictionary.ord_to_term(term_ord, buf)? { + return Ok(false); + } // TODO consider remove checks if it hurts performance. if std::str::from_utf8(buf.as_slice()).is_err() { buf.clear(); diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 72e90051f..77dfed1d0 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -33,7 +33,7 @@ pub fn serialize_column_mappable_to_u64( ) -> io::Result<()> { let column_index_num_bytes = serialize_column_index(column_index, output)?; serialize_u64_based_column_values( - column_values, + || column_values.boxed_iter(), &[CodecType::Bitpacked, CodecType::BlockwiseLinear], output, )?; diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index c22699732..bfff83960 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -15,7 +15,7 @@ pub fn serialize_multivalued_index( output: &mut impl Write, ) -> io::Result<()> { crate::column_values::u64_based::serialize_u64_based_column_values( - &*multivalued_index, + || multivalued_index.boxed_iter(), &[CodecType::Bitpacked, CodecType::Linear], output, )?; diff --git a/columnar/src/column_values/serialize.rs b/columnar/src/column_values/serialize.rs index 254711192..21d7749b0 100644 --- a/columnar/src/column_values/serialize.rs +++ b/columnar/src/column_values/serialize.rs @@ -113,8 +113,8 @@ pub mod tests { #[test] fn test_fastfield_bool_size_bitwidth_1() { let mut buffer = Vec::new(); - serialize_u64_based_column_values::( - &&[false, true][..], + serialize_u64_based_column_values( + || [false, true].into_iter(), &ALL_U64_CODEC_TYPES, &mut buffer, ) @@ -127,8 +127,12 @@ pub mod tests { #[test] fn test_fastfield_bool_bit_size_bitwidth_0() { let mut buffer = Vec::new(); - serialize_u64_based_column_values::(&&[true][..], &ALL_U64_CODEC_TYPES, &mut buffer) - .unwrap(); + serialize_u64_based_column_values( + || [false, true].into_iter(), + &ALL_U64_CODEC_TYPES, + &mut buffer, + ) + .unwrap(); // 5 bytes of header, 0 bytes of value, 7 bytes of padding. assert_eq!(buffer.len(), 5); } @@ -137,8 +141,12 @@ pub mod tests { fn test_fastfield_gcd() { let mut buffer = Vec::new(); let vals: Vec = (0..80).map(|val| (val % 7) * 1_000u64).collect(); - serialize_u64_based_column_values(&&vals[..], &[CodecType::Bitpacked], &mut buffer) - .unwrap(); + serialize_u64_based_column_values( + || vals.iter().cloned(), + &[CodecType::Bitpacked], + &mut buffer, + ) + .unwrap(); // Values are stored over 3 bits. assert_eq!(buffer.len(), 6 + (3 * 80 / 8)); } diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 0acda4fe5..4f2917c2e 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -155,18 +155,22 @@ impl CodecType { } } -pub fn serialize_u64_based_column_values( - vals: &dyn Iterable, +pub fn serialize_u64_based_column_values( + vals: F, codec_types: &[CodecType], wrt: &mut dyn Write, -) -> io::Result<()> { +) -> io::Result<()> +where + I: Iterator, + F: Fn() -> I, +{ let mut stats_collector = StatsCollector::default(); let mut estimators: Vec<(CodecType, Box)> = Vec::with_capacity(codec_types.len()); for &codec_type in codec_types { estimators.push((codec_type, codec_type.estimator())); } - for val in vals.boxed_iter() { + for val in vals() { let val_u64 = val.to_u64(); stats_collector.collect(val_u64); for (_, estimator) in &mut estimators { @@ -190,7 +194,7 @@ pub fn serialize_u64_based_column_values( best_codec.to_code().serialize(wrt)?; best_codec_estimator.serialize( &stats, - &mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64), + &mut vals().map(MonotonicallyMappableToU64::to_u64), wrt, )?; Ok(()) @@ -214,7 +218,7 @@ pub fn serialize_and_load_u64_based_column_values codec_types: &[CodecType], ) -> Arc> { let mut buffer = Vec::new(); - serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap(); + serialize_u64_based_column_values(|| vals.boxed_iter(), codec_types, &mut buffer).unwrap(); load_u64_based_column_values::(OwnedBytes::new(buffer)).unwrap() } diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 2fec5298f..6b345e1d6 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -7,7 +7,7 @@ fn test_serialize_and_load_simple() { let mut buffer = Vec::new(); let vals = &[1u64, 2u64, 5u64]; serialize_u64_based_column_values( - &&vals[..], + || vals.iter().cloned(), &[CodecType::Bitpacked, CodecType::BlockwiseLinear], &mut buffer, ) @@ -243,7 +243,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> let mut vals: Vec = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect(); let mut buffer: Vec = Vec::new(); crate::column_values::serialize_u64_based_column_values( - &&vals[..], + || vals.iter().cloned(), &[codec_type], &mut buffer, )?; @@ -260,7 +260,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) -> vals.pop(); vals.push(1001i64); crate::column_values::serialize_u64_based_column_values( - &&vals[..], + || vals.iter().cloned(), &[codec_type], &mut buffer_without_gcd, )?; @@ -286,7 +286,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> let mut vals: Vec = (1..=num_vals).map(|i| i as u64 * 1000u64).collect(); let mut buffer: Vec = Vec::new(); crate::column_values::serialize_u64_based_column_values( - &&vals[..], + || vals.iter().cloned(), &[codec_type], &mut buffer, )?; @@ -303,7 +303,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) -> vals.pop(); vals.push(1001u64); crate::column_values::serialize_u64_based_column_values( - &&vals[..], + || vals.iter().cloned(), &[codec_type], &mut buffer_without_gcd, )?; diff --git a/columnar/src/columnar/merge/merge_dict_column.rs b/columnar/src/columnar/merge/merge_dict_column.rs new file mode 100644 index 000000000..4da10542e --- /dev/null +++ b/columnar/src/columnar/merge/merge_dict_column.rs @@ -0,0 +1,114 @@ +use std::io::{self, Write}; + +use common::CountingWriter; +use itertools::Itertools; +use sstable::{SSTable, TermOrdinal}; + +use super::term_merger::TermMerger; +use crate::column_index::{serialize_column_index, SerializableColumnIndex}; +use crate::column_values::{serialize_u64_based_column_values, CodecType}; +use crate::BytesColumn; + +// Serialize [Dictionary, Column, dictionary num bytes U32::LE] +// Column: [Column Index, Column Values, column index num bytes U32::LE] +pub fn merge_bytes_or_str_column( + column_index: SerializableColumnIndex<'_>, + bytes_columns: &[BytesColumn], + output: &mut impl Write, +) -> io::Result<()> { + // Serialize dict and generate mapping for values + let mut output = CountingWriter::wrap(output); + let term_ord_mapping = serialize_merged_dict(bytes_columns, &mut output)?; + let dictionary_num_bytes: u32 = output.written_bytes() as u32; + let output = output.finish(); + + serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?; + + output.write_all(&dictionary_num_bytes.to_le_bytes())?; + Ok(()) +} + +fn serialize_bytes_or_str_column( + column_index: SerializableColumnIndex<'_>, + bytes_columns: &[BytesColumn], + term_ord_mapping: &TermOrdinalMapping, + output: &mut impl Write, +) -> io::Result<()> { + let column_index_num_bytes = serialize_column_index(column_index, output)?; + + let column_values = move || { + let iter = bytes_columns + .iter() + .enumerate() + .flat_map(|(segment_ord, byte_column)| { + let segment_ord = term_ord_mapping.get_segment(segment_ord); + byte_column + .ords() + .values + .iter() + .map(move |term_ord| segment_ord[term_ord as usize]) + }); + iter + }; + + serialize_u64_based_column_values( + column_values, + &[CodecType::Bitpacked, CodecType::BlockwiseLinear], + output, + )?; + + output.write_all(&column_index_num_bytes.to_le_bytes())?; + + Ok(()) +} + +fn serialize_merged_dict( + bytes_columns: &[BytesColumn], + output: &mut impl Write, +) -> io::Result { + let mut term_ord_mapping = TermOrdinalMapping::default(); + + let mut field_term_streams = Vec::new(); + for column in bytes_columns { + term_ord_mapping.add_segment(column.dictionary.num_terms()); + let terms = column.dictionary.stream()?; + field_term_streams.push(terms); + } + + let mut merged_terms = TermMerger::new(field_term_streams); + let mut sstable_builder = sstable::VoidSSTable::writer(output); + + let mut current_term_ord = 0; + while merged_terms.advance() { + let term_bytes: &[u8] = merged_terms.key(); + + sstable_builder.insert(term_bytes, &())?; + for (segment_ord, from_term_ord) in merged_terms.matching_segments() { + term_ord_mapping.register_from_to(segment_ord, from_term_ord, current_term_ord); + } + current_term_ord += 1; + } + sstable_builder.finish()?; + + Ok(term_ord_mapping) +} + +#[derive(Default)] +struct TermOrdinalMapping { + per_segment_new_term_ordinals: Vec>, +} + +impl TermOrdinalMapping { + fn add_segment(&mut self, max_term_ord: usize) { + self.per_segment_new_term_ordinals + .push(vec![TermOrdinal::default(); max_term_ord as usize]); + } + + fn register_from_to(&mut self, segment_ord: usize, from_ord: TermOrdinal, to_ord: TermOrdinal) { + self.per_segment_new_term_ordinals[segment_ord][from_ord as usize] = to_ord; + } + + fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] { + &(self.per_segment_new_term_ordinals[segment_ord])[..] + } +} diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 13241a7f1..f06705b60 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -1,4 +1,6 @@ +mod merge_dict_column; mod merge_mapping; +mod term_merger; // mod sorted_doc_id_column; @@ -12,6 +14,7 @@ pub use merge_mapping::{MergeRowOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64}; use crate::columnar::column_type::ColumnTypeCategory; +use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column; use crate::columnar::writer::CompatibleNumericalTypes; use crate::columnar::ColumnarReader; use crate::dynamic_column::DynamicColumn; @@ -26,7 +29,6 @@ pub fn merge_columnar( ) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(output); - // TODO handle dictionary merge for Str/Bytes column let columns_to_merge = group_columns_for_merge(columnar_readers)?; for ((column_name, column_type), columns) in columns_to_merge { let mut column_serializer = @@ -101,22 +103,24 @@ pub fn merge_column( )?; } ColumnType::Bytes | ColumnType::Str => { - let mut bytes_columns: Vec> = Vec::with_capacity(columns.len()); + let mut column_indexes: Vec> = Vec::with_capacity(columns.len()); + let mut bytes_columns: Vec = Vec::with_capacity(columns.len()); for dynamic_column_opt in columns { match dynamic_column_opt { Some(DynamicColumn::Str(str_column)) => { - bytes_columns.push(Some(str_column.into())); + column_indexes.push(Some(str_column.term_ord_column.idx.clone())); + bytes_columns.push(str_column.into()); } Some(DynamicColumn::Bytes(bytes_column)) => { - bytes_columns.push(Some(bytes_column)); - } - None => bytes_columns.push(None), - _ => { - panic!("This should never happen."); + column_indexes.push(Some(bytes_column.term_ord_column.idx.clone())); + bytes_columns.push(bytes_column); } + _ => column_indexes.push(None), } } - todo!(); + let merged_column_index = + crate::column_index::stack_column_index(&column_indexes[..], merge_row_order); + merge_bytes_or_str_column(merged_column_index, &bytes_columns, wrt)?; } } Ok(()) diff --git a/columnar/src/columnar/merge/term_merger.rs b/columnar/src/columnar/merge/term_merger.rs new file mode 100644 index 000000000..75cba350a --- /dev/null +++ b/columnar/src/columnar/merge/term_merger.rs @@ -0,0 +1,107 @@ +use std::cmp::Ordering; +use std::collections::BinaryHeap; + +use sstable::TermOrdinal; + +use crate::Streamer; + +pub struct HeapItem<'a> { + pub streamer: Streamer<'a>, + pub segment_ord: usize, +} + +impl<'a> PartialEq for HeapItem<'a> { + fn eq(&self, other: &Self) -> bool { + self.segment_ord == other.segment_ord + } +} + +impl<'a> Eq for HeapItem<'a> {} + +impl<'a> PartialOrd for HeapItem<'a> { + fn partial_cmp(&self, other: &HeapItem<'a>) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a> Ord for HeapItem<'a> { + fn cmp(&self, other: &HeapItem<'a>) -> Ordering { + (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) + } +} + +/// Given a list of sorted term streams, +/// returns an iterator over sorted unique terms. +/// +/// The item yield is actually a pair with +/// - the term +/// - a slice with the ordinal of the segments containing +/// the terms. +pub struct TermMerger<'a> { + heap: BinaryHeap>, + current_streamers: Vec>, +} + +impl<'a> TermMerger<'a> { + /// Stream of merged term dictionary + pub fn new(streams: Vec>) -> TermMerger<'a> { + TermMerger { + heap: BinaryHeap::new(), + current_streamers: streams + .into_iter() + .enumerate() + .map(|(ord, streamer)| HeapItem { + streamer, + segment_ord: ord, + }) + .collect(), + } + } + + pub(crate) fn matching_segments<'b: 'a>( + &'b self, + ) -> impl 'b + Iterator { + self.current_streamers + .iter() + .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())) + } + + fn advance_segments(&mut self) { + let streamers = &mut self.current_streamers; + let heap = &mut self.heap; + for mut heap_item in streamers.drain(..) { + if heap_item.streamer.advance() { + heap.push(heap_item); + } + } + } + + /// Advance the term iterator to the next term. + /// Returns true if there is indeed another term + /// False if there is none. + pub fn advance(&mut self) -> bool { + self.advance_segments(); + if let Some(head) = self.heap.pop() { + self.current_streamers.push(head); + while let Some(next_streamer) = self.heap.peek() { + if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { + break; + } + let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand + self.current_streamers.push(next_heap_it); + } + true + } else { + false + } + } + + /// Returns the current term. + /// + /// This method may be called + /// if and only if advance() has been called before + /// and "true" was returned. + pub fn key(&self) -> &[u8] { + self.current_streamers[0].streamer.key() + } +} diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 6dd5237b3..2ddca568e 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -73,7 +73,9 @@ fn test_missing_column() { } } -fn make_columnar_multiple_columns(columns: &[(&str, &[&[NumericalValue]])]) -> ColumnarReader { +fn make_numerical_columnar_multiple_columns( + columns: &[(&str, &[&[NumericalValue]])], +) -> ColumnarReader { let mut dataframe_writer = ColumnarWriter::default(); for (column_name, column_values) in columns { for (row_id, vals) in column_values.iter().enumerate() { @@ -92,12 +94,52 @@ fn make_columnar_multiple_columns(columns: &[(&str, &[&[NumericalValue]])]) -> C ColumnarReader::open(buffer).unwrap() } +fn make_byte_columnar_multiple_columns(columns: &[(&str, &[&[&[u8]]])]) -> ColumnarReader { + let mut dataframe_writer = ColumnarWriter::default(); + for (column_name, column_values) in columns { + for (row_id, vals) in column_values.iter().enumerate() { + for val in vals.iter() { + dataframe_writer.record_bytes(row_id as u32, column_name, *val); + } + } + } + let num_rows = columns + .iter() + .map(|(_, val_rows)| val_rows.len() as RowId) + .max() + .unwrap_or(0u32); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(num_rows, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() +} + +fn make_text_columnar_multiple_columns(columns: &[(&str, &[&[&str]])]) -> ColumnarReader { + let mut dataframe_writer = ColumnarWriter::default(); + for (column_name, column_values) in columns { + for (row_id, vals) in column_values.iter().enumerate() { + for val in vals.iter() { + dataframe_writer.record_str(row_id as u32, column_name, *val); + } + } + } + let num_rows = columns + .iter() + .map(|(_, val_rows)| val_rows.len() as RowId) + .max() + .unwrap_or(0u32); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(num_rows, &mut buffer).unwrap(); + ColumnarReader::open(buffer).unwrap() +} + #[test] -fn test_merge_columnar() { +fn test_merge_columnar_numbers() { let columnar1 = - make_columnar_multiple_columns(&[("numbers", &[&[NumericalValue::from(-1f64)]])]); - let columnar2 = - make_columnar_multiple_columns(&[("numbers", &[&[], &[NumericalValue::from(-3f64)]])]); + make_numerical_columnar_multiple_columns(&[("numbers", &[&[NumericalValue::from(-1f64)]])]); + let columnar2 = make_numerical_columnar_multiple_columns(&[( + "numbers", + &[&[], &[NumericalValue::from(-3f64)]], + )]); let mut buffer = Vec::new(); let columnars = &[columnar1, columnar2]; let stack_merge_order = StackMergeOrder::from_columnars(columnars); @@ -118,3 +160,93 @@ fn test_merge_columnar() { assert_eq!(vals.first(1u32), None); assert_eq!(vals.first(2u32), Some(-3f64)); } + +#[test] +fn test_merge_columnar_texts() { + let columnar1 = make_text_columnar_multiple_columns(&[("texts", &[&["a"]])]); + let columnar2 = make_text_columnar_multiple_columns(&[("texts", &[&[], &["b"]])]); + let mut buffer = Vec::new(); + let columnars = &[columnar1, columnar2]; + let stack_merge_order = StackMergeOrder::from_columnars(columnars); + crate::columnar::merge_columnar( + columnars, + MergeRowOrder::Stack(stack_merge_order), + &mut buffer, + ) + .unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_rows(), 3); + assert_eq!(columnar_reader.num_columns(), 1); + let cols = columnar_reader.read_columns("texts").unwrap(); + let dynamic_column = cols[0].open().unwrap(); + let DynamicColumn::Str(vals) = dynamic_column else { panic!() }; + let get_str_for_ord = |ord| { + let mut out = String::new(); + vals.ord_to_str(ord, &mut out).unwrap(); + out + }; + + assert_eq!(vals.dictionary.num_terms(), 2); + assert_eq!(get_str_for_ord(0), "a"); + assert_eq!(get_str_for_ord(1), "b"); + + let get_str_for_row = |row_id| { + let term_ords: Vec = vals.term_ords(row_id).collect(); + assert!(term_ords.len() <= 1); + let mut out = String::new(); + if term_ords.len() == 1 { + vals.ord_to_str(term_ords[0], &mut out).unwrap(); + } + out + }; + + assert_eq!(get_str_for_row(0), "a"); + assert_eq!(get_str_for_row(1), ""); + assert_eq!(get_str_for_row(2), "b"); +} + +#[test] +fn test_merge_columnar_byte() { + let columnar1 = make_byte_columnar_multiple_columns(&[("bytes", &[&[b"bbbb"], &[b"baaa"]])]); + let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]); + let mut buffer = Vec::new(); + let columnars = &[columnar1, columnar2]; + let stack_merge_order = StackMergeOrder::from_columnars(columnars); + crate::columnar::merge_columnar( + columnars, + MergeRowOrder::Stack(stack_merge_order), + &mut buffer, + ) + .unwrap(); + let columnar_reader = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar_reader.num_rows(), 4); + assert_eq!(columnar_reader.num_columns(), 1); + let cols = columnar_reader.read_columns("bytes").unwrap(); + let dynamic_column = cols[0].open().unwrap(); + let DynamicColumn::Bytes(vals) = dynamic_column else { panic!() }; + let get_bytes_for_ord = |ord| { + let mut out = Vec::new(); + vals.ord_to_bytes(ord, &mut out).unwrap(); + out + }; + + assert_eq!(vals.dictionary.num_terms(), 3); + assert_eq!(get_bytes_for_ord(0), b"a"); + assert_eq!(get_bytes_for_ord(1), b"baaa"); + assert_eq!(get_bytes_for_ord(2), b"bbbb"); + + let get_bytes_for_row = |row_id| { + let term_ords: Vec = vals.term_ords(row_id).collect(); + assert!(term_ords.len() <= 1); + let mut out = Vec::new(); + if term_ords.len() == 1 { + vals.ord_to_bytes(term_ords[0], &mut out).unwrap(); + } + out + }; + + assert_eq!(get_bytes_for_row(0), b"bbbb"); + assert_eq!(get_bytes_for_row(1), b"baaa"); + assert_eq!(get_bytes_for_row(2), b""); + assert_eq!(get_bytes_for_row(3), b"a"); +} diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 4ef5de6be..4e9a9f5c0 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -401,6 +401,8 @@ impl ColumnarWriter { } } +// Serialize [Dictionary, Column, dictionary num bytes U32::LE] +// Column: [Column Index, Column Values, column index num bytes U32::LE] fn serialize_bytes_or_str_column( cardinality: Cardinality, num_docs: RowId, diff --git a/columnar/src/iterable.rs b/columnar/src/iterable.rs index 3817504e9..9b0585921 100644 --- a/columnar/src/iterable.rs +++ b/columnar/src/iterable.rs @@ -27,6 +27,23 @@ impl Iterable for &dyn Iterable { } } +impl Iterable for F +where F: Fn() -> Box> +{ + fn boxed_iter(&self) -> Box + '_> { + self() + } +} + +// impl Iterable for F +// where +// I: Iterator, +// F: Fn() -> I, +//{ +// fn boxed_iter(&self) -> Box + '_> { +// Box::new(self()) +//} + pub fn map_iterable( original_iterable: impl Iterable, transform: impl Fn(U) -> V,