From 33d18d0424039c4dfff45d67de0af1eb9f95758e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 1 Feb 2023 19:13:16 +0900 Subject: [PATCH] Plugged fastfield merge Fixing unit tests. Fixing gcd stats isolation --- columnar/src/column_values/mod.rs | 2 + columnar/src/column_values/serialize.rs | 4 +- columnar/src/column_values/stats.rs | 85 +++++++++++++++++++ .../src/column_values/u64_based/bitpacked.rs | 4 +- .../u64_based/blockwise_linear.rs | 16 ++-- columnar/src/column_values/u64_based/line.rs | 2 +- columnar/src/column_values/u64_based/mod.rs | 46 +--------- .../u64_based/stats_collector.rs | 76 +++++++++++------ columnar/src/column_values/u64_based/tests.rs | 2 + columnar/src/columnar/merge/merge_mapping.rs | 4 +- columnar/src/columnar/merge/mod.rs | 4 +- columnar/src/columnar/merge/tests.rs | 14 +-- src/fastfield/mod.rs | 16 ++-- src/fastfield/readers.rs | 4 + src/indexer/merger.rs | 18 ++-- 15 files changed, 190 insertions(+), 107 deletions(-) create mode 100644 columnar/src/column_values/stats.rs diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index 9fd7747c9..33940b52f 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -22,12 +22,14 @@ use serialize::U128Header; mod compact_space; pub(crate) mod monotonic_mapping; pub(crate) mod monotonic_mapping_u128; +mod stats; pub(crate) mod u64_based; mod column; pub mod serialize; pub use serialize::serialize_column_values_u128; +pub use stats::Stats; pub use u64_based::{ load_u64_based_column_values, serialize_and_load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, diff --git a/columnar/src/column_values/serialize.rs b/columnar/src/column_values/serialize.rs index 21d7749b0..cdf3493e8 100644 --- a/columnar/src/column_values/serialize.rs +++ b/columnar/src/column_values/serialize.rs @@ -133,8 +133,8 @@ pub mod tests { &mut buffer, ) .unwrap(); - // 5 bytes of header, 0 bytes of value, 7 bytes of padding. - assert_eq!(buffer.len(), 5); + // 6 bytes of header, 0 bytes of value, 7 bytes of padding. + assert_eq!(buffer.len(), 6); } #[test] diff --git a/columnar/src/column_values/stats.rs b/columnar/src/column_values/stats.rs new file mode 100644 index 000000000..001c947fc --- /dev/null +++ b/columnar/src/column_values/stats.rs @@ -0,0 +1,85 @@ +use std::{io, num::NonZeroU64}; +use std::io::Write; + +use common::{BinarySerializable, VInt}; + +use crate::RowId; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Stats { + pub gcd: NonZeroU64, + pub min_value: u64, + pub max_value: u64, + pub num_rows: RowId, +} + +impl Stats { + pub fn amplitude(&self) -> u64 { + self.max_value - self.min_value + } +} + +impl BinarySerializable for Stats { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + VInt(self.min_value).serialize(writer)?; + VInt(self.gcd.get()).serialize(writer)?; + VInt(self.amplitude() / self.gcd).serialize(writer)?; + VInt(self.num_rows as u64).serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let min_value = VInt::deserialize(reader)?.0; + let gcd = VInt::deserialize(reader)?.0; + let gcd= NonZeroU64::new(gcd).ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "GCD of 0 is forbidden"))?; + let amplitude = VInt::deserialize(reader)?.0 * gcd.get(); + let max_value = min_value + amplitude; + let num_rows = VInt::deserialize(reader)?.0 as RowId; + Ok(Stats { + min_value, + max_value, + num_rows, + gcd, + }) + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU64; + + use common::BinarySerializable; + + use crate::column_values::Stats; + + #[track_caller] + fn test_stats_ser_deser_aux(stats: &Stats, num_bytes: usize) { + let mut buffer: Vec = Vec::new(); + stats.serialize(&mut buffer).unwrap(); + assert_eq!(buffer.len(), num_bytes); + let deser_stats = Stats::deserialize(&mut &buffer[..]).unwrap(); + assert_eq!(stats, &deser_stats); + } + + #[test] + fn test_stats_serialization() { + test_stats_ser_deser_aux(&(Stats { + gcd: NonZeroU64::new(3).unwrap(), + min_value: 1, + max_value: 3001, + num_rows: 10, + }), 5); + test_stats_ser_deser_aux(&(Stats { + gcd: NonZeroU64::new(1_000).unwrap(), + min_value: 1, + max_value: 3001, + num_rows: 10, + }), 5); + test_stats_ser_deser_aux(&(Stats { + gcd: NonZeroU64::new(1).unwrap(), + min_value: 0, + max_value: 0, + num_rows: 0, + }), 4); + } +} diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index 59e003eef..d91c0ac9a 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -19,7 +19,7 @@ pub struct BitpackedReader { impl ColumnValues for BitpackedReader { #[inline(always)] fn get_val(&self, doc: u32) -> u64 { - self.stats.min_value + self.stats.gcd * self.bit_unpacker.get(doc, &self.data) + self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data) } #[inline] @@ -60,7 +60,7 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator { stats.serialize(wrt)?; let num_bits = num_bits(stats); let mut bit_packer = BitPacker::new(); - let divider = DividerU64::divide_by(stats.gcd); + let divider = DividerU64::divide_by(stats.gcd.get()); for val in vals { bit_packer.write(divider.divide(val - stats.min_value), num_bits, wrt)?; } diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index 0a7963de2..e6169a503 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -86,8 +86,8 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { } fn estimate(&self, stats: &Stats) -> Option { let mut estimate = 4 + stats.num_bytes() + self.meta_num_bytes + self.values_num_bytes; - if stats.gcd > 1 { - let estimate_gain_from_gcd = (stats.gcd as f32).log2() * stats.num_rows as f32 / 8.0f32; + if stats.gcd.get() > 1 { + let estimate_gain_from_gcd = (stats.gcd.get() as f32).log2().floor() * stats.num_rows as f32 / 8.0f32; estimate = estimate.saturating_sub(estimate_gain_from_gcd as u64); } Some(estimate) @@ -110,7 +110,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { let mut bit_packer = BitPacker::new(); - let gcd_divider = DividerU64::divide_by(stats.gcd); + let gcd_divider = DividerU64::divide_by(stats.gcd.get()); for _ in 0..num_blocks { buffer.clear(); @@ -121,10 +121,10 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { ); for buffer_val in buffer.iter_mut() { - *buffer_val = gcd_divider.divide(*buffer_val); + *buffer_val = gcd_divider.divide(*buffer_val - stats.min_value); } - let line = Line::train(&VecColumn::from(&buffer)); + let mut line = Line::train(&VecColumn::from(&buffer)); assert!(!buffer.is_empty()); @@ -132,6 +132,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { let interpolated_val = line.eval(i as u32); *buffer_val = buffer_val.wrapping_sub(interpolated_val); } + let bit_width = buffer.iter().copied().map(compute_num_bits).max().unwrap(); for &buffer_val in &buffer { @@ -205,7 +206,8 @@ impl ColumnValues for BlockwiseLinearReader { let interpoled_val: u64 = block.line.eval(idx_within_block); let block_bytes = &self.data[block.data_start_offset..]; let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); - self.stats.gcd * interpoled_val.wrapping_add(bitpacked_diff) + // TODO optimize me! the line parameters could be tweaked to include the multiplication and remove the dependency. + self.stats.min_value + self.stats.gcd.get().wrapping_mul(interpoled_val.wrapping_add(bitpacked_diff)) } #[inline(always)] @@ -259,7 +261,7 @@ mod tests { } #[test] - fn bitpacked_fast_field_rand() { + fn test_blockwise_linear_fast_field_rand() { for _ in 0..500 { let mut data = (0..1 + rand::random::() as usize) .map(|_| rand::random::() as u64 / 2) diff --git a/columnar/src/column_values/u64_based/line.rs b/columnar/src/column_values/u64_based/line.rs index e7a23ec3b..daa55ed8d 100644 --- a/columnar/src/column_values/u64_based/line.rs +++ b/columnar/src/column_values/u64_based/line.rs @@ -17,7 +17,7 @@ const MID_POINT: u64 = (1u64 << 32) - 1u64; /// `y = m * x >> 32 + b` #[derive(Debug, Clone, Copy, Default)] pub struct Line { - slope: u64, + pub(crate) slope: u64, pub(crate) intercept: u64, } diff --git a/columnar/src/column_values/u64_based/mod.rs b/columnar/src/column_values/u64_based/mod.rs index 4f2917c2e..8d58ea6f4 100644 --- a/columnar/src/column_values/u64_based/mod.rs +++ b/columnar/src/column_values/u64_based/mod.rs @@ -6,13 +6,10 @@ mod stats_collector; use std::io; use std::io::Write; -use std::marker::PhantomData; -use std::ops::Range; use std::sync::Arc; -use common::{BinarySerializable, OwnedBytes, VInt}; +use common::{BinarySerializable, OwnedBytes}; -use crate::column_values::monotonic_map_column; use crate::column_values::monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, }; @@ -20,46 +17,9 @@ use crate::column_values::u64_based::bitpacked::BitpackedCodec; use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec; use crate::column_values::u64_based::linear::LinearCodec; use crate::column_values::u64_based::stats_collector::StatsCollector; +use crate::column_values::{monotonic_map_column, Stats}; use crate::iterable::Iterable; -use crate::{ColumnValues, MonotonicallyMappableToU64, RowId}; - -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Stats { - gcd: u64, - min_value: u64, - max_value: u64, - num_rows: RowId, -} - -impl Stats { - fn amplitude(&self) -> u64 { - self.max_value - self.min_value - } -} - -impl BinarySerializable for Stats { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - VInt(self.gcd).serialize(writer)?; - VInt(self.min_value / self.gcd).serialize(writer)?; - VInt(self.amplitude() / self.gcd).serialize(writer)?; - VInt(self.num_rows as u64).serialize(writer)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let gcd = VInt::deserialize(reader)?.0; - let min_value = VInt::deserialize(reader)?.0 * gcd; - let amplitude = VInt::deserialize(reader)?.0 * gcd; - let max_value = min_value + amplitude; - let num_rows = VInt::deserialize(reader)?.0 as RowId; - Ok(Stats { - min_value, - max_value, - num_rows, - gcd, - }) - } -} +use crate::{ColumnValues, MonotonicallyMappableToU64}; pub trait ColumnCodecEstimator: 'static { fn collect(&mut self, value: u64); diff --git a/columnar/src/column_values/u64_based/stats_collector.rs b/columnar/src/column_values/u64_based/stats_collector.rs index 0728312f1..650190c75 100644 --- a/columnar/src/column_values/u64_based/stats_collector.rs +++ b/columnar/src/column_values/u64_based/stats_collector.rs @@ -2,7 +2,7 @@ use std::num::NonZeroU64; use fastdivide::DividerU64; -use crate::column_values::u64_based::Stats; +use crate::column_values::Stats; use crate::RowId; /// Compute the gcd of two non null numbers. @@ -21,34 +21,50 @@ fn compute_gcd(mut large: NonZeroU64, mut small: NonZeroU64) -> NonZeroU64 { #[derive(Default)] pub struct StatsCollector { - pub(crate) min_max_opt: Option<(u64, u64)>, - pub(crate) num_rows: RowId, - pub(crate) gcd_opt: Option<(NonZeroU64, DividerU64)>, + min_max_opt: Option<(u64, u64)>, + num_rows: RowId, + // We measure the GCD of the difference between the values and the minimal value. + // This is the same as computing the difference between the values and the first value. + // + // This way, we can compress i64-converted-to-u64 (e.g. timestamp that were supplied in + // seconds, only to be converted in microseconds). + increment_gcd_opt: Option<(NonZeroU64, DividerU64)>, + first_value_opt: Option, } impl StatsCollector { pub fn stats(&self) -> Stats { let (min_value, max_value) = self.min_max_opt.unwrap_or((0u64, 0u64)); - let gcd = if let Some((gcd, _)) = self.gcd_opt { - gcd.get() + let increment_gcd = if let Some((increment_gcd, _)) = self.increment_gcd_opt { + increment_gcd } else { - 1u64 + NonZeroU64::new(1u64).unwrap() }; Stats { min_value, max_value, num_rows: self.num_rows, - gcd, + gcd: increment_gcd, } } #[inline] - fn update_gcd(&mut self, non_zero_value: NonZeroU64) { - let Some((gcd, gcd_divider)) = self.gcd_opt else { - self.set_gcd(non_zero_value); + fn update_increment_gcd(&mut self, value: u64) { + let Some(first_value) = self.first_value_opt else { + // We set the first value and just quit. + self.first_value_opt = Some(value); + return; + }; + let Some(non_zero_value) = NonZeroU64::new(value.abs_diff(first_value)) else { + // We can simply skip 0 values. + return; + }; + let Some((gcd, gcd_divider)) = self.increment_gcd_opt else { + self.set_increment_gcd(non_zero_value); return; }; if gcd.get() == 1 { + // It won't see any update now. return; } let remainder = @@ -57,12 +73,12 @@ impl StatsCollector { return; } let new_gcd = compute_gcd(non_zero_value, gcd); - self.set_gcd(new_gcd); + self.set_increment_gcd(new_gcd); } - fn set_gcd(&mut self, gcd: NonZeroU64) { + fn set_increment_gcd(&mut self, gcd: NonZeroU64) { let new_divider = DividerU64::divide_by(gcd.get()); - self.gcd_opt = Some((gcd, new_divider)); + self.increment_gcd_opt = Some((gcd, new_divider)); } pub fn collect(&mut self, value: u64) { @@ -72,10 +88,7 @@ impl StatsCollector { (value, value) }); self.num_rows += 1; - let Some(non_zero_value) = NonZeroU64::new(value) else { - return; - }; - self.update_gcd(non_zero_value); + self.update_increment_gcd(value); } } @@ -95,7 +108,7 @@ mod tests { } fn find_gcd(vals: impl Iterator) -> u64 { - compute_stats(vals).gcd + compute_stats(vals).gcd.get() } #[test] @@ -123,6 +136,8 @@ mod tests { assert_eq!(find_gcd([15, 16, 10].into_iter()), 1); assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), 5); assert_eq!(find_gcd([0, 0].into_iter()), 1); + assert_eq!(find_gcd([1, 10, 4, 1, 7, 10].into_iter()), 3); + assert_eq!(find_gcd([1, 10, 0, 4, 1, 7, 10].into_iter()), 1); } #[test] @@ -130,7 +145,7 @@ mod tests { assert_eq!( compute_stats([].into_iter()), Stats { - gcd: 1, + gcd: NonZeroU64::new(1).unwrap(), min_value: 0, max_value: 0, num_rows: 0 @@ -139,7 +154,7 @@ mod tests { assert_eq!( compute_stats([0, 1].into_iter()), Stats { - gcd: 1, + gcd: NonZeroU64::new(1).unwrap(), min_value: 0, max_value: 1, num_rows: 2 @@ -148,25 +163,34 @@ mod tests { assert_eq!( compute_stats([0, 1].into_iter()), Stats { - gcd: 1, + gcd: NonZeroU64::new(1).unwrap(), min_value: 0, max_value: 1, num_rows: 2 } ); assert_eq!( - compute_stats([10, 30].into_iter()), + compute_stats([10, 20, 30].into_iter()), Stats { - gcd: 10, + gcd: NonZeroU64::new(10).unwrap(), min_value: 10, max_value: 30, - num_rows: 2 + num_rows: 3 + } + ); + assert_eq!( + compute_stats([10, 50, 10, 30].into_iter()), + Stats { + gcd: NonZeroU64::new(20).unwrap(), + min_value: 10, + max_value: 50, + num_rows: 4 } ); assert_eq!( compute_stats([10, 0, 30].into_iter()), Stats { - gcd: 10, + gcd: NonZeroU64::new(10).unwrap(), min_value: 0, max_value: 30, num_rows: 3 diff --git a/columnar/src/column_values/u64_based/tests.rs b/columnar/src/column_values/u64_based/tests.rs index 6b345e1d6..b82cdf349 100644 --- a/columnar/src/column_values/u64_based/tests.rs +++ b/columnar/src/column_values/u64_based/tests.rs @@ -67,6 +67,8 @@ pub(crate) fn create_and_validate( ); assert_eq!(expected_positions, positions); } + dbg!(estimation); + dbg!(actual_compression); if actual_compression > 20 { assert!(relative_difference(estimation, actual_compression) < 0.10f32); } diff --git a/columnar/src/columnar/merge/merge_mapping.rs b/columnar/src/columnar/merge/merge_mapping.rs index 21ac24321..1e38bf61b 100644 --- a/columnar/src/columnar/merge/merge_mapping.rs +++ b/columnar/src/columnar/merge/merge_mapping.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use crate::{column, RowId}; +use crate::{column, RowId, ColumnarReader}; pub struct StackMergeOrder { // This does not start at 0. The first row is the number of @@ -9,7 +9,7 @@ pub struct StackMergeOrder { } impl StackMergeOrder { - pub fn from_columnars(columnars: &[crate::ColumnarReader]) -> StackMergeOrder { + pub fn from_columnars(columnars: &[&ColumnarReader]) -> StackMergeOrder { let mut cumulated_row_ids: Vec = Vec::with_capacity(columnars.len()); let mut cumulated_row_id = 0; for columnar in columnars { diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index f06705b60..d088d74ee 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -23,7 +23,7 @@ use crate::{ }; pub fn merge_columnar( - columnar_readers: &[ColumnarReader], + columnar_readers: &[&ColumnarReader], mapping: MergeRowOrder, output: &mut impl io::Write, ) -> io::Result<()> { @@ -127,7 +127,7 @@ pub fn merge_column( } fn group_columns_for_merge( - columnar_readers: &[ColumnarReader], + columnar_readers: &[&ColumnarReader], ) -> io::Result>>> { // Each column name may have multiple types of column associated. // For merging we are interested in the same column type category since they can be merged. diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 2ddca568e..851617b29 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -24,7 +24,7 @@ fn test_column_coercion_to_u64() { // u64 type let columnar2 = make_columnar("numbers", &[u64::MAX]); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(&[columnar1, columnar2]).unwrap(); + group_columns_for_merge(&[&columnar1, &columnar2]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -34,7 +34,7 @@ fn test_column_no_coercion_if_all_the_same() { let columnar1 = make_columnar("numbers", &[1u64]); let columnar2 = make_columnar("numbers", &[2u64]); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(&[columnar1, columnar2]).unwrap(); + group_columns_for_merge(&[&columnar1, &columnar2]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -44,7 +44,7 @@ fn test_column_coercion_to_i64() { let columnar1 = make_columnar("numbers", &[-1i64]); let columnar2 = make_columnar("numbers", &[2u64]); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(&[columnar1, columnar2]).unwrap(); + group_columns_for_merge(&[&columnar1, &columnar2]).unwrap(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); } @@ -54,7 +54,7 @@ fn test_missing_column() { let columnar1 = make_columnar("numbers", &[-1i64]); let columnar2 = make_columnar("numbers2", &[2u64]); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(&[columnar1, columnar2]).unwrap(); + group_columns_for_merge(&[&columnar1, &columnar2]).unwrap(); assert_eq!(column_map.len(), 2); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); { @@ -141,7 +141,7 @@ fn test_merge_columnar_numbers() { &[&[], &[NumericalValue::from(-3f64)]], )]); let mut buffer = Vec::new(); - let columnars = &[columnar1, columnar2]; + let columnars = &[&columnar1, &columnar2]; let stack_merge_order = StackMergeOrder::from_columnars(columnars); crate::columnar::merge_columnar( columnars, @@ -166,7 +166,7 @@ fn test_merge_columnar_texts() { let columnar1 = make_text_columnar_multiple_columns(&[("texts", &[&["a"]])]); let columnar2 = make_text_columnar_multiple_columns(&[("texts", &[&[], &["b"]])]); let mut buffer = Vec::new(); - let columnars = &[columnar1, columnar2]; + let columnars = &[&columnar1, &columnar2]; let stack_merge_order = StackMergeOrder::from_columnars(columnars); crate::columnar::merge_columnar( columnars, @@ -210,7 +210,7 @@ fn test_merge_columnar_byte() { let columnar1 = make_byte_columnar_multiple_columns(&[("bytes", &[&[b"bbbb"], &[b"baaa"]])]); let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]); let mut buffer = Vec::new(); - let columnars = &[columnar1, columnar2]; + let columnars = &[&columnar1, &columnar2]; let stack_merge_order = StackMergeOrder::from_columnars(columnars); crate::columnar::merge_columnar( columnars, diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 65d422545..384067ce8 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -189,7 +189,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 157); + assert_eq!(file.len(), 161); let fast_field_readers = FastFieldReaders::open(file).unwrap(); let column = fast_field_readers.u64("field").unwrap(); assert_eq!(column.get_val(0), 13u64); @@ -236,7 +236,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 185); + assert_eq!(file.len(), 189); let fast_field_readers = FastFieldReaders::open(file).unwrap(); let col = fast_field_readers.u64("field").unwrap(); assert_eq!(col.get_val(0), 4u64); @@ -266,7 +266,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 158); + assert_eq!(file.len(), 162); let fast_field_readers = FastFieldReaders::open(file).unwrap(); let fast_field_reader = fast_field_readers.u64("field").unwrap(); for doc in 0..10_000 { @@ -295,7 +295,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 80166); + assert_eq!(file.len(), 4557); { let fast_field_readers = FastFieldReaders::open(file).unwrap(); let col = fast_field_readers.u64("field").unwrap(); @@ -325,7 +325,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 329_usize); + assert_eq!(file.len(), 333_usize); { let fast_field_readers = FastFieldReaders::open(file).unwrap(); @@ -793,7 +793,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 171); + assert_eq!(file.len(), 175); let fast_field_readers = FastFieldReaders::open(file).unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap(); assert_eq!(bool_col.get_val(0), true); @@ -825,7 +825,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 183); + assert_eq!(file.len(), 187); let readers = FastFieldReaders::open(file).unwrap(); let bool_col = readers.bool("field_bool").unwrap(); for i in 0..25 { @@ -850,7 +850,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 173); + assert_eq!(file.len(), 177); let fastfield_readers = FastFieldReaders::open(file).unwrap(); let col = fastfield_readers.bool("field_bool").unwrap(); assert_eq!(col.get_val(0), false); diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index ea70211c9..04f0f1fd2 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -26,6 +26,10 @@ impl FastFieldReaders { Ok(FastFieldReaders { columnar }) } + pub(crate) fn columnar(&self) -> &ColumnarReader { + self.columnar.as_ref() + } + pub(crate) fn space_usage(&self, schema: &Schema) -> io::Result { let mut per_field_usages: Vec = Default::default(); for (field, field_entry) in schema.fields() { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index f8478da40..b2725b464 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use columnar::ColumnValues; +use columnar::{ColumnValues, ColumnarReader, MergeRowOrder, StackMergeOrder}; use itertools::Itertools; use measure_time::debug_time; @@ -248,13 +248,17 @@ impl IndexMerger { mut term_ord_mappings: HashMap, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - debug_time!("wrie-fast-fields"); - for (_field, field_entry) in self.schema.fields() { - if field_entry.is_fast() { - todo!(); - } + debug_time!("write-fast-fields"); + let columnars: Vec<&ColumnarReader> = self + .readers + .iter() + .map(|reader| reader.fast_fields().columnar()) + .collect(); + if !doc_id_mapping.is_trivial() { + todo!() } - + let merge_row_order = MergeRowOrder::Stack(StackMergeOrder::from_columnars(&columnars[..])); + columnar::merge_columnar(&columnars[..], merge_row_order, fast_field_wrt)?; // for (field, field_entry) in self.schema.fields() { // let field_type = field_entry.field_type(); // match field_type {