Compare commits

..

2 Commits

Author SHA1 Message Date
Paul Masurel
e9af721e60 Fixing compile 2023-02-05 18:05:29 +01:00
Paul Masurel
88ed3d8b48 Switching back to iterable. 2023-02-02 14:02:10 +01:00
20 changed files with 147 additions and 300 deletions

View File

@@ -11,30 +11,25 @@ use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::serialize::serialize_column_values_u128;
use crate::column_values::u64_based::{serialize_u64_based_column_values, CodecType};
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use crate::iterable::{map_iterable, Iterable};
use crate::iterable::Iterable;
pub fn serialize_column_mappable_to_u128<I, T: MonotonicallyMappableToU128>(
pub fn serialize_column_mappable_to_u128<T: MonotonicallyMappableToU128>(
column_index: SerializableColumnIndex<'_>,
iterable: &dyn Fn() -> I,
iterable: &dyn Iterable<T>,
num_vals: u32,
output: &mut impl Write,
) -> io::Result<()>
where
I: Iterator<Item = T>,
{
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
let u128_iterable = map_iterable(iterable, MonotonicallyMappableToU128::to_u128);
serialize_column_values_u128(&u128_iterable, num_vals, output)?;
serialize_column_values_u128(iterable, num_vals, output)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}
pub fn serialize_column_mappable_to_u64<T: MonotonicallyMappableToU64 + Debug, I>(
pub fn serialize_column_mappable_to_u64<T: MonotonicallyMappableToU64 + Debug>(
column_index: SerializableColumnIndex<'_>,
column_values: &dyn Fn() -> I,
column_values: &impl Iterable<T>,
output: &mut impl Write,
) -> io::Result<()>
where I: Iterator<Item=T> {
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
serialize_u64_based_column_values(
column_values,

View File

@@ -25,22 +25,19 @@ pub fn stack_column_index<'a>(
let cardinality = detect_cardinality(columns);
match cardinality {
Cardinality::Full => SerializableColumnIndex::Full,
Cardinality::Optional => {
let stacked_optional_index: StackedOptionalIndex<'a> = StackedOptionalIndex {
Cardinality::Optional => SerializableColumnIndex::Optional {
non_null_row_ids: Box::new(StackedOptionalIndex {
columns,
stack_merge_order,
};
SerializableColumnIndex::Optional {
non_null_row_ids: Box::new(move || Box::new(stacked_optional_index.iter())),
num_rows: stack_merge_order.num_rows(),
}
}),
num_rows: stack_merge_order.num_rows(),
},
Cardinality::Multivalued => {
let stacked_multivalued_index = StackedMultivaluedIndex {
columns,
stack_merge_order,
};
SerializableColumnIndex::Multivalued(Box::new(move || stacked_multivalued_index.boxed_iter()))
SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index))
}
}
}
@@ -50,8 +47,8 @@ struct StackedOptionalIndex<'a> {
stack_merge_order: &'a StackMergeOrder,
}
impl<'a> StackedOptionalIndex<'a> {
fn iter(&self) -> impl Iterator<Item=RowId> + 'a {
impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = RowId> + 'a> {
Box::new(
self.columns
.iter()
@@ -103,8 +100,8 @@ fn convert_column_opt_to_multivalued_index<'a>(
}
}
impl<'a> StackedMultivaluedIndex<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = RowId> + 'a> {
impl<'a> Iterable<RowId> for StackedMultivaluedIndex<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = RowId> + '_> {
let multivalued_indexes =
self.columns
.iter()

View File

@@ -7,10 +7,11 @@ use common::OwnedBytes;
use crate::column_values::u64_based::CodecType;
use crate::column_values::ColumnValues;
use crate::iterable::Iterable;
use crate::RowId;
pub fn serialize_multivalued_index<'a>(
multivalued_index: &'a dyn Fn() -> Box<dyn Iterator<Item=RowId> + 'a>,
pub fn serialize_multivalued_index(
multivalued_index: &dyn Iterable<RowId>,
output: &mut impl Write,
) -> io::Result<()> {
crate::column_values::u64_based::serialize_u64_based_column_values(

View File

@@ -343,13 +343,13 @@ fn serialize_optional_index_block(block_els: &[u16], out: &mut impl io::Write) -
}
pub fn serialize_optional_index<'a, W: io::Write>(
non_null_rows: &dyn Fn() -> Box<dyn Iterator<Item=RowId> + 'a>,
non_null_rows: &dyn Iterable<RowId>,
num_rows: RowId,
output: &mut W,
) -> io::Result<()> {
VInt(num_rows as u64).serialize(output)?;
let mut rows_it = non_null_rows();
let mut rows_it = non_null_rows.boxed_iter();
let mut block_metadata: Vec<SerializedBlockMeta> = Vec::new();
let mut current_block = Vec::new();

View File

@@ -3,7 +3,7 @@ use std::io::Write;
use common::{CountingWriter, OwnedBytes};
use crate::column_index::multivalued_index::{serialize_multivalued_index, self};
use crate::column_index::multivalued_index::serialize_multivalued_index;
use crate::column_index::optional_index::serialize_optional_index;
use crate::column_index::ColumnIndex;
use crate::iterable::Iterable;
@@ -12,12 +12,12 @@ use crate::{Cardinality, RowId};
pub enum SerializableColumnIndex<'a> {
Full,
Optional {
non_null_row_ids: Box<dyn Fn() -> Box<dyn Iterator<Item=RowId> + 'a> + 'a>,
non_null_row_ids: Box<dyn Iterable<RowId> + 'a>,
num_rows: RowId,
},
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
Multivalued(&'a dyn Fn() -> Box<dyn Iterator<Item=RowId> + 'a>),
Multivalued(Box<dyn Iterable<RowId> + 'a>),
}
impl<'a> SerializableColumnIndex<'a> {
@@ -30,8 +30,8 @@ impl<'a> SerializableColumnIndex<'a> {
}
}
pub fn serialize_column_index<'a>(
column_index: SerializableColumnIndex<'a>,
pub fn serialize_column_index(
column_index: SerializableColumnIndex,
output: &mut impl Write,
) -> io::Result<u32> {
let mut output = CountingWriter::wrap(output);
@@ -44,8 +44,7 @@ pub fn serialize_column_index<'a>(
num_rows,
} => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?,
SerializableColumnIndex::Multivalued(multivalued_index) => {
let multivalued_index_ref: &'a dyn Fn() -> Box<dyn Iterator<Item=RowId> + 'a> = multivalued_index.as_ref();
serialize_multivalued_index(multivalued_index_ref, &mut output)?
serialize_multivalued_index(&*multivalued_index, &mut output)?
}
}
let column_index_num_bytes = output.written_bytes() as u32;

View File

@@ -80,6 +80,12 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}
impl<'a, T: PartialOrd> Iterable<T> for &'a [Arc<dyn ColumnValues<T>>] {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.iter().flat_map(|column_value| column_value.iter()))
}
}
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
#[inline(always)]
fn get_val(&self, idx: u32) -> T {

View File

@@ -1,21 +1,12 @@
use std::fmt::Debug;
use std::io;
use std::num::NonZeroU64;
use common::{BinarySerializable, VInt};
use log::warn;
use super::monotonic_mapping::{
StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalGCDBaseval,
};
use super::{
monotonic_map_column, u64_based, ColumnValues, MonotonicallyMappableToU64,
U128FastFieldCodecType,
};
use crate::column_values::compact_space::CompactSpaceCompressor;
use crate::column_values::u64_based::CodecType;
use crate::column_values::U128FastFieldCodecType;
use crate::iterable::Iterable;
use crate::MonotonicallyMappableToU128;
/// The normalized header gives some parameters after applying the following
/// normalization of the vector:
@@ -53,19 +44,9 @@ impl BinarySerializable for U128Header {
}
}
fn normalize_column<C: ColumnValues>(
from_column: C,
min_value: u64,
gcd: Option<NonZeroU64>,
) -> impl ColumnValues {
let gcd = gcd.map(|gcd| gcd.get()).unwrap_or(1);
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, min_value);
monotonic_map_column(from_column, mapping)
}
/// Serializes u128 values with the compact space codec.
pub fn serialize_column_values_u128<I: Iterator<Item = u128>>(
iterable: &dyn Fn() -> I,
pub fn serialize_column_values_u128<T: MonotonicallyMappableToU128>(
iterable: &dyn Iterable<T>,
num_vals: u32,
output: &mut impl io::Write,
) -> io::Result<()> {
@@ -74,9 +55,18 @@ pub fn serialize_column_values_u128<I: Iterator<Item = u128>>(
codec_type: U128FastFieldCodecType::CompactSpace,
};
header.serialize(output)?;
let compressor = CompactSpaceCompressor::train_from(iterable(), num_vals);
compressor.compress_into(iterable(), output)?;
let compressor = CompactSpaceCompressor::train_from(
iterable
.boxed_iter()
.map(MonotonicallyMappableToU128::to_u128),
num_vals,
);
compressor.compress_into(
iterable
.boxed_iter()
.map(MonotonicallyMappableToU128::to_u128),
output,
)?;
Ok(())
}
@@ -87,6 +77,7 @@ pub mod tests {
self, serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
ALL_U64_CODEC_TYPES,
};
use crate::column_values::CodecType;
#[test]
fn test_serialize_deserialize_u128_header() {
@@ -113,8 +104,8 @@ pub mod tests {
#[test]
fn test_fastfield_bool_size_bitwidth_1() {
let mut buffer = Vec::new();
serialize_u64_based_column_values(
|| [false, true].into_iter(),
serialize_u64_based_column_values::<bool>(
&&[false, true][..],
&ALL_U64_CODEC_TYPES,
&mut buffer,
)
@@ -127,8 +118,8 @@ pub mod tests {
#[test]
fn test_fastfield_bool_bit_size_bitwidth_0() {
let mut buffer = Vec::new();
serialize_u64_based_column_values(
|| [false, true].into_iter(),
serialize_u64_based_column_values::<bool>(
&&[false, true][..],
&ALL_U64_CODEC_TYPES,
&mut buffer,
)
@@ -141,12 +132,8 @@ pub mod tests {
fn test_fastfield_gcd() {
let mut buffer = Vec::new();
let vals: Vec<u64> = (0..80).map(|val| (val % 7) * 1_000u64).collect();
serialize_u64_based_column_values(
|| vals.iter().cloned(),
&[CodecType::Bitpacked],
&mut buffer,
)
.unwrap();
serialize_u64_based_column_values(&&vals[..], &[CodecType::Bitpacked], &mut buffer)
.unwrap();
// Values are stored over 3 bits.
assert_eq!(buffer.len(), 6 + (3 * 80 / 8));
}

View File

@@ -125,7 +125,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
}
let mut line = Line::train(&VecColumn::from(&buffer));
let line = Line::train(&VecColumn::from(&buffer));
assert!(!buffer.is_empty());

View File

@@ -115,22 +115,18 @@ impl CodecType {
}
}
pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64, F, I>(
vals: F,
pub fn serialize_u64_based_column_values<'a, T: MonotonicallyMappableToU64>(
vals: &dyn Iterable<T>,
codec_types: &[CodecType],
wrt: &mut dyn Write,
) -> io::Result<()>
where
I: Iterator<Item = T>,
F: Fn() -> I,
{
) -> io::Result<()> {
let mut stats_collector = StatsCollector::default();
let mut estimators: Vec<(CodecType, Box<dyn ColumnCodecEstimator>)> =
Vec::with_capacity(codec_types.len());
for &codec_type in codec_types {
estimators.push((codec_type, codec_type.estimator()));
}
for val in vals() {
for val in vals.boxed_iter() {
let val_u64 = val.to_u64();
stats_collector.collect(val_u64);
for (_, estimator) in &mut estimators {
@@ -154,7 +150,7 @@ where
best_codec.to_code().serialize(wrt)?;
best_codec_estimator.serialize(
&stats,
&mut vals().map(MonotonicallyMappableToU64::to_u64),
&mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64),
wrt,
)?;
Ok(())
@@ -178,7 +174,7 @@ pub fn serialize_and_load_u64_based_column_values<T: MonotonicallyMappableToU64>
codec_types: &[CodecType],
) -> Arc<dyn ColumnValues<T>> {
let mut buffer = Vec::new();
serialize_u64_based_column_values(|| vals.boxed_iter(), codec_types, &mut buffer).unwrap();
serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap();
load_u64_based_column_values::<T>(OwnedBytes::new(buffer)).unwrap()
}

View File

@@ -7,7 +7,7 @@ fn test_serialize_and_load_simple() {
let mut buffer = Vec::new();
let vals = &[1u64, 2u64, 5u64];
serialize_u64_based_column_values(
|| vals.iter().cloned(),
&&vals[..],
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
&mut buffer,
)
@@ -67,9 +67,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
);
assert_eq!(expected_positions, positions);
}
dbg!(estimation);
dbg!(actual_compression);
if actual_compression > 20 {
if actual_compression > 1000 {
assert!(relative_difference(estimation, actual_compression) < 0.10f32);
}
Some((
@@ -101,12 +99,21 @@ proptest! {
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
}
#[test]
fn test_proptest_small_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..10)) {
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
}
}
#[test]
fn test_small_blockwise_linear_example() {
create_and_validate::<BlockwiseLinearCodec>(
&[9223372036854775808, 9223370937344622593],
"proptest multilinearinterpol",
);
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
@@ -245,7 +252,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
let mut vals: Vec<i64> = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::column_values::serialize_u64_based_column_values(
|| vals.iter().cloned(),
&&vals[..],
&[codec_type],
&mut buffer,
)?;
@@ -262,7 +269,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
vals.pop();
vals.push(1001i64);
crate::column_values::serialize_u64_based_column_values(
|| vals.iter().cloned(),
&&vals[..],
&[codec_type],
&mut buffer_without_gcd,
)?;
@@ -288,7 +295,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
let mut vals: Vec<u64> = (1..=num_vals).map(|i| i as u64 * 1000u64).collect();
let mut buffer: Vec<u8> = Vec::new();
crate::column_values::serialize_u64_based_column_values(
|| vals.iter().cloned(),
&&vals[..],
&[codec_type],
&mut buffer,
)?;
@@ -305,7 +312,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
vals.pop();
vals.push(1001u64);
crate::column_values::serialize_u64_based_column_values(
|| vals.iter().cloned(),
&&vals[..],
&[codec_type],
&mut buffer_without_gcd,
)?;

View File

@@ -1,12 +1,12 @@
use std::io::{self, Write};
use common::CountingWriter;
use itertools::Itertools;
use sstable::{SSTable, TermOrdinal};
use super::term_merger::TermMerger;
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::{serialize_u64_based_column_values, CodecType};
use crate::column::serialize_column_mappable_to_u64;
use crate::column_index::SerializableColumnIndex;
use crate::iterable::Iterable;
use crate::BytesColumn;
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
@@ -21,45 +21,38 @@ pub fn merge_bytes_or_str_column(
let term_ord_mapping = serialize_merged_dict(bytes_columns, &mut output)?;
let dictionary_num_bytes: u32 = output.written_bytes() as u32;
let output = output.finish();
serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?;
let remapped_term_ordinals_values = RemappedTermOrdinalsValues {
bytes_columns,
term_ord_mapping: &term_ord_mapping,
};
serialize_column_mappable_to_u64(column_index, &remapped_term_ordinals_values, output)?;
// serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?;
output.write_all(&dictionary_num_bytes.to_le_bytes())?;
Ok(())
}
fn serialize_bytes_or_str_column(
column_index: SerializableColumnIndex<'_>,
bytes_columns: &[BytesColumn],
term_ord_mapping: &TermOrdinalMapping,
output: &mut impl Write,
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
struct RemappedTermOrdinalsValues<'a> {
bytes_columns: &'a [BytesColumn],
term_ord_mapping: &'a TermOrdinalMapping,
}
let column_values = move || {
let iter = bytes_columns
impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let iter = self
.bytes_columns
.iter()
.enumerate()
.flat_map(|(segment_ord, byte_column)| {
let segment_ord = term_ord_mapping.get_segment(segment_ord);
let segment_ord = self.term_ord_mapping.get_segment(segment_ord);
byte_column
.ords()
.values
.iter()
.map(move |term_ord| segment_ord[term_ord as usize])
});
iter
};
serialize_u64_based_column_values(
column_values,
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
output,
)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
// TODO see if we can better decompose the mapping / and the stacking
Box::new(iter)
}
}
fn serialize_merged_dict(
@@ -89,7 +82,6 @@ fn serialize_merged_dict(
current_term_ord += 1;
}
sstable_builder.finish()?;
Ok(term_ord_mapping)
}

View File

@@ -9,7 +9,7 @@ pub struct StackMergeOrder {
}
impl StackMergeOrder {
pub fn from_columnars(columnars: &[&ColumnarReader]) -> StackMergeOrder {
pub fn stack(columnars: &[&ColumnarReader]) -> StackMergeOrder {
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
let mut cumulated_row_id = 0;
for columnar in columnars {

View File

@@ -13,13 +13,14 @@ pub use merge_mapping::{MergeRowOrder, StackMergeOrder};
use super::writer::ColumnarSerializer;
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
use crate::column_index::stack_column_index;
use crate::columnar::column_type::ColumnTypeCategory;
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues,
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, MonotonicallyMappableToU128,
NumericalType, NumericalValue,
};
@@ -78,10 +79,7 @@ pub fn merge_column(
}
let merged_column_index =
crate::column_index::stack_column_index(&column_indexes[..], merge_row_order);
let stacked_columns_iterable = || column_values
.iter()
.flat_map(|column| column.iter());
serialize_column_mappable_to_u64(merged_column_index, &stacked_columns_iterable, wrt)?;
serialize_column_mappable_to_u64(merged_column_index, &&column_values[..], wrt)?;
}
ColumnType::IpAddr => {
let mut column_indexes: Vec<Option<ColumnIndex>> = Vec::with_capacity(columns.len());
@@ -101,11 +99,7 @@ pub fn merge_column(
crate::column_index::stack_column_index(&column_indexes[..], merge_row_order);
serialize_column_mappable_to_u128(
merged_column_index,
&|| {
column_values
.iter()
.flat_map(|column_value| column_value.iter())
},
&&column_values[..],
num_values,
wrt,
)?;

View File

@@ -142,7 +142,7 @@ fn test_merge_columnar_numbers() {
)]);
let mut buffer = Vec::new();
let columnars = &[&columnar1, &columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
let stack_merge_order = StackMergeOrder::stack(columnars);
crate::columnar::merge_columnar(
columnars,
MergeRowOrder::Stack(stack_merge_order),
@@ -167,7 +167,7 @@ fn test_merge_columnar_texts() {
let columnar2 = make_text_columnar_multiple_columns(&[("texts", &[&[], &["b"]])]);
let mut buffer = Vec::new();
let columnars = &[&columnar1, &columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
let stack_merge_order = StackMergeOrder::stack(columnars);
crate::columnar::merge_columnar(
columnars,
MergeRowOrder::Stack(stack_merge_order),
@@ -211,7 +211,7 @@ fn test_merge_columnar_byte() {
let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]);
let mut buffer = Vec::new();
let columnars = &[&columnar1, &columnar2];
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
let stack_merge_order = StackMergeOrder::stack(columnars);
crate::columnar::merge_columnar(
columnars,
MergeRowOrder::Stack(stack_merge_order),

View File

@@ -20,7 +20,7 @@ use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
};
use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders, OptionalIndexBuilder};
use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders};
use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId};
use crate::value::{Coerce, NumericalType, NumericalValue};
use crate::{Cardinality, RowId};
@@ -572,22 +572,22 @@ where
Cardinality::Optional => {
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_operation_iterator(op_iterator, optional_index_builder, values);
let non_null_rows: &[u32] = optional_index_builder.finish(num_rows);
let optional_index = optional_index_builder.finish(num_rows);
SerializableColumnIndex::Optional {
num_rows,
non_null_row_ids: Box::new(|| Box::new(non_null_rows.iter().copied())),
non_null_row_ids: Box::new(optional_index),
}
}
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let multivalued_index = multivalued_index_builder.finish(num_rows);
SerializableColumnIndex::Multivalued(Box::new(|| Box::new(multivalued_index.iter().copied())))
SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
}
};
crate::column::serialize_column_mappable_to_u128(
serializable_column_index,
&|| values.iter().copied(),
&&values[..],
values.len() as u32,
&mut wrt,
)?;
@@ -603,17 +603,17 @@ fn sort_values_within_row_in_place(multivalued_index: &[RowId], values: &mut Vec
}
}
fn send_to_serialize_column_mappable_to_u64<'a>(
fn send_to_serialize_column_mappable_to_u64(
op_iterator: impl Iterator<Item = ColumnOperation<u64>>,
cardinality: Cardinality,
num_rows: RowId,
sort_values_within_row: bool,
value_index_builders: &'a mut PreallocatedIndexBuilders,
value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<u64>,
mut wrt: impl io::Write,
) -> io::Result<()>
where
for<'b> VecColumn<'b, u64>: ColumnValues<u64>,
for<'a> VecColumn<'a, u64>: ColumnValues<u64>,
{
values.clear();
let serializable_column_index = match cardinality {
@@ -626,11 +626,11 @@ where
SerializableColumnIndex::Full
}
Cardinality::Optional => {
let optional_index_builder: &'a mut OptionalIndexBuilder = value_index_builders.borrow_optional_index_builder();
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_operation_iterator(op_iterator, optional_index_builder, values);
let optional_index: &'a [u32] = optional_index_builder.finish(num_rows);
let optional_index = optional_index_builder.finish(num_rows);
SerializableColumnIndex::Optional {
non_null_row_ids: Box::new(move || Box::new(optional_index.iter().copied())),
non_null_row_ids: Box::new(optional_index),
num_rows,
}
}
@@ -641,12 +641,12 @@ where
if sort_values_within_row {
sort_values_within_row_in_place(multivalued_index, values);
}
SerializableColumnIndex::Multivalued(Box::new(|| Box::new(multivalued_index.iter().copied())))
SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
}
};
crate::column::serialize_column_mappable_to_u64(
serializable_column_index,
&|| values.iter().copied(),
&&values[..],
&mut wrt,
)?;
Ok(())

View File

@@ -1,3 +1,4 @@
use crate::iterable::Iterable;
use crate::RowId;
/// The `IndexBuilder` interprets a sequence of
@@ -28,7 +29,7 @@ pub struct OptionalIndexBuilder {
}
impl OptionalIndexBuilder {
pub fn finish<'a>(&'a mut self, num_rows: RowId) -> &'a [RowId] {
pub fn finish<'a>(&'a mut self, num_rows: RowId) -> impl Iterable<RowId> + 'a {
debug_assert!(self
.docs
.last()
@@ -122,14 +123,20 @@ mod tests {
opt_value_index_builder.record_row(0u32);
opt_value_index_builder.record_value();
assert_eq!(
&opt_value_index_builder.finish(1u32),
&opt_value_index_builder
.finish(1u32)
.boxed_iter()
.collect::<Vec<u32>>(),
&[0]
);
opt_value_index_builder.reset();
opt_value_index_builder.record_row(1u32);
opt_value_index_builder.record_value();
assert_eq!(
&opt_value_index_builder.finish(2u32),
&opt_value_index_builder
.finish(2u32)
.boxed_iter()
.collect::<Vec<u32>>(),
&[1]
);
}

View File

@@ -1,51 +1,9 @@
use std::marker::PhantomData;
use std::ops::Range;
pub trait Iterable<T = u64> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
}
struct Mapped<U, Original, Transform> {
original_iterable: Original,
transform: Transform,
input_type: PhantomData<U>,
}
impl<U, V, Original, Transform> Iterable<V> for Mapped<U, Original, Transform>
where
Original: Iterable<U>,
Transform: Fn(U) -> V,
{
fn boxed_iter(&self) -> Box<dyn Iterator<Item = V> + '_> {
Box::new(self.original_iterable.boxed_iter().map(&self.transform))
}
}
impl<U> Iterable<U> for &dyn Iterable<U> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = U> + '_> {
(*self).boxed_iter()
}
}
impl<F, T> Iterable<T> for F
where F: Fn() -> Box<dyn Iterator<Item = T>>
{
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
self()
}
}
pub fn map_iterable<U, V, F, I>(
original_iterable: impl Fn() -> I,
transform: F,
) -> impl Fn() -> std::iter::Map<I, F>
where
F: Fn(U) -> V + Clone,
I: Iterator<Item = U>,
{
move || original_iterable().map(transform.clone())
}
impl<'a, T: Copy> Iterable<T> for &'a [T] {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.iter().copied())

View File

@@ -26,7 +26,6 @@ pub use columnar::{
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeRowOrder, StackMergeOrder,
};
pub(crate) use iterable::{map_iterable, Iterable};
use sstable::VoidSSTable;
pub use value::{NumericalType, NumericalValue};

View File

@@ -37,6 +37,11 @@ impl SegmentDocIdMapping {
/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
/// The different segment may present some deletes, in which case it is expressed by skipping a
/// `DocId`. [(0, 1), (0, 3)] <--- here doc_id=0 and doc_id=1 have been deleted
///
/// Being trivial is equivalent to having the `new_doc_id_to_old_doc_addr` array sorted.
///
/// This allows for some optimization.
pub(crate) fn is_trivial(&self) -> bool {
self.is_trivial

View File

@@ -86,29 +86,6 @@ pub struct IndexMerger {
max_doc: u32,
}
struct TermOrdinalMapping {
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
}
impl TermOrdinalMapping {
fn new(max_term_ords: Vec<TermOrdinal>) -> TermOrdinalMapping {
TermOrdinalMapping {
per_segment_new_term_ordinals: max_term_ords
.into_iter()
.map(|max_term_ord| vec![TermOrdinal::default(); max_term_ord as usize])
.collect(),
}
}
fn register_from_to(&mut self, segment_ord: usize, from_ord: TermOrdinal, to_ord: TermOrdinal) {
self.per_segment_new_term_ordinals[segment_ord][from_ord as usize] = to_ord;
}
fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] {
&(self.per_segment_new_term_ordinals[segment_ord])[..]
}
}
struct DeltaComputer {
buffer: Vec<u32>,
}
@@ -245,7 +222,6 @@ impl IndexMerger {
fn write_fast_fields(
&self,
fast_field_wrt: &mut WritePtr,
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-fast-fields");
@@ -257,59 +233,8 @@ impl IndexMerger {
if !doc_id_mapping.is_trivial() {
todo!()
}
let merge_row_order = MergeRowOrder::Stack(StackMergeOrder::from_columnars(&columnars[..]));
let merge_row_order = MergeRowOrder::Stack(StackMergeOrder::stack(&columnars[..]));
columnar::merge_columnar(&columnars[..], merge_row_order, fast_field_wrt)?;
// for (field, field_entry) in self.schema.fields() {
// let field_type = field_entry.field_type();
// match field_type {
// FieldType::Facet(_) | FieldType::Str(_) if field_type.is_fast() => {
// let term_ordinal_mapping = term_ord_mappings.remove(&field).expect(
// "Logic Error in Tantivy (Please report). Facet field should have required \
// a`term_ordinal_mapping`.",
// );
// self.write_term_id_fast_field(
// field,
// &term_ordinal_mapping,
// fast_field_serializer,
// doc_id_mapping,
// )?;
// }
// FieldType::U64(ref options)
// | FieldType::I64(ref options)
// | FieldType::F64(ref options)
// | FieldType::Bool(ref options) => {
// todo!()
// }
// FieldType::Date(ref options) => {
// if options.is_fast() {
// todo!();
// }
// Some(Cardinality::SingleValue) => {
// self.write_single_fast_field(field, fast_field_serializer, doc_id_mapping)?;
// }
// Some(Cardinality::MultiValues) => {
// self.write_multi_fast_field(field, fast_field_serializer, doc_id_mapping)?;
// }
// None => {}
// },
// FieldType::Bytes(byte_options) => {
// if byte_options.is_fast() {
// self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
// }
// }
// FieldType::IpAddr(options) => {
// if options.is_fast() {
// todo!();
// }
// },
//
// FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
// We don't handle json fast field for the moment
// They can be implemented using what is done
// for facets in the future
// }
// }
// }
Ok(())
}
@@ -374,7 +299,7 @@ impl IndexMerger {
/// doc_id.
/// ReaderWithOrdinal will include the ordinal position of the
/// reader in self.readers.
pub(crate) fn generate_doc_id_mapping(
pub(crate) fn generate_doc_id_mapping_with_sort_by_field(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<SegmentDocIdMapping> {
@@ -454,7 +379,7 @@ impl IndexMerger {
serializer: &mut InvertedIndexSerializer,
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Option<TermOrdinalMapping>> {
) -> crate::Result<()> {
debug_time!("write-postings-for-field");
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
@@ -474,14 +399,6 @@ impl IndexMerger {
max_term_ords.push(terms.num_terms() as u64);
}
let mut term_ord_mapping_opt = match field_type {
FieldType::Facet(_) => Some(TermOrdinalMapping::new(max_term_ords)),
FieldType::Str(options) if options.is_fast() => {
Some(TermOrdinalMapping::new(max_term_ords))
}
_ => None,
};
let mut merged_terms = TermMerger::new(field_term_streams);
// map from segment doc ids to the resulting merged segment doc id.
@@ -566,12 +483,6 @@ impl IndexMerger {
let to_term_ord = field_serializer.new_term(term_bytes, total_doc_freq)?;
if let Some(ref mut term_ord_mapping) = term_ord_mapping_opt {
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, to_term_ord);
}
}
// We can now serialize this postings, by pushing each document to the
// postings serializer.
for (segment_ord, mut segment_postings) in
@@ -622,7 +533,7 @@ impl IndexMerger {
field_serializer.close_term()?;
}
field_serializer.close()?;
Ok(term_ord_mapping_opt)
Ok(())
}
fn write_postings(
@@ -630,23 +541,20 @@ impl IndexMerger {
serializer: &mut InvertedIndexSerializer,
fieldnorm_readers: FieldNormReaders,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<HashMap<Field, TermOrdinalMapping>> {
let mut term_ordinal_mappings = HashMap::new();
) -> crate::Result<()> {
for (field, field_entry) in self.schema.fields() {
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
if field_entry.is_indexed() {
if let Some(term_ordinal_mapping) = self.write_postings_for_field(
self.write_postings_for_field(
field,
field_entry.field_type(),
serializer,
fieldnorm_reader,
doc_id_mapping,
)? {
term_ordinal_mappings.insert(field, term_ordinal_mapping);
}
);
}
}
Ok(term_ordinal_mappings)
Ok(())
}
fn write_storable_fields(
@@ -731,7 +639,7 @@ impl IndexMerger {
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
self.get_doc_id_from_concatenated_data()?
} else {
self.generate_doc_id_mapping(sort_by_field)?
self.generate_doc_id_mapping_with_sort_by_field(sort_by_field)?
}
} else {
self.get_doc_id_from_concatenated_data()?
@@ -745,17 +653,13 @@ impl IndexMerger {
.segment()
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let term_ord_mappings = self.write_postings(
self.write_postings(
serializer.get_postings_serializer(),
fieldnorm_readers,
&doc_id_mapping,
)?;
debug!("write-fastfields");
self.write_fast_fields(
serializer.get_fast_field_write(),
term_ord_mappings,
&doc_id_mapping,
)?;
self.write_fast_fields(serializer.get_fast_field_write(), &doc_id_mapping)?;
debug!("write-storagefields");
self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?;
debug!("close-serializer");