mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-11 11:32:54 +00:00
Compare commits
2 Commits
columnar-m
...
columnar-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9af721e60 | ||
|
|
88ed3d8b48 |
@@ -11,20 +11,16 @@ use crate::column_index::{serialize_column_index, SerializableColumnIndex};
|
||||
use crate::column_values::serialize::serialize_column_values_u128;
|
||||
use crate::column_values::u64_based::{serialize_u64_based_column_values, CodecType};
|
||||
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
|
||||
use crate::iterable::{map_iterable, Iterable};
|
||||
use crate::iterable::Iterable;
|
||||
|
||||
pub fn serialize_column_mappable_to_u128<I, T: MonotonicallyMappableToU128>(
|
||||
pub fn serialize_column_mappable_to_u128<T: MonotonicallyMappableToU128>(
|
||||
column_index: SerializableColumnIndex<'_>,
|
||||
iterable: &dyn Fn() -> I,
|
||||
iterable: &dyn Iterable<T>,
|
||||
num_vals: u32,
|
||||
output: &mut impl Write,
|
||||
) -> io::Result<()>
|
||||
where
|
||||
I: Iterator<Item = T>,
|
||||
{
|
||||
) -> io::Result<()> {
|
||||
let column_index_num_bytes = serialize_column_index(column_index, output)?;
|
||||
let u128_iterable = map_iterable(iterable, MonotonicallyMappableToU128::to_u128);
|
||||
serialize_column_values_u128(&u128_iterable, num_vals, output)?;
|
||||
serialize_column_values_u128(iterable, num_vals, output)?;
|
||||
output.write_all(&column_index_num_bytes.to_le_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -36,7 +32,7 @@ pub fn serialize_column_mappable_to_u64<T: MonotonicallyMappableToU64 + Debug>(
|
||||
) -> io::Result<()> {
|
||||
let column_index_num_bytes = serialize_column_index(column_index, output)?;
|
||||
serialize_u64_based_column_values(
|
||||
|| column_values.boxed_iter(),
|
||||
column_values,
|
||||
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
|
||||
output,
|
||||
)?;
|
||||
|
||||
@@ -15,7 +15,7 @@ pub fn serialize_multivalued_index(
|
||||
output: &mut impl Write,
|
||||
) -> io::Result<()> {
|
||||
crate::column_values::u64_based::serialize_u64_based_column_values(
|
||||
|| multivalued_index.boxed_iter(),
|
||||
multivalued_index,
|
||||
&[CodecType::Bitpacked, CodecType::Linear],
|
||||
output,
|
||||
)?;
|
||||
|
||||
@@ -80,7 +80,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Ord> Iterable<T> for &'a [Arc<dyn ColumnValues<T>>] {
|
||||
impl<'a, T: PartialOrd> Iterable<T> for &'a [Arc<dyn ColumnValues<T>>] {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
Box::new(self.iter().flat_map(|column_value| column_value.iter()))
|
||||
}
|
||||
|
||||
@@ -1,21 +1,12 @@
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
use log::warn;
|
||||
|
||||
use super::monotonic_mapping::{
|
||||
StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal,
|
||||
StrictlyMonotonicMappingToInternalGCDBaseval,
|
||||
};
|
||||
use super::{
|
||||
monotonic_map_column, u64_based, ColumnValues, MonotonicallyMappableToU64,
|
||||
U128FastFieldCodecType,
|
||||
};
|
||||
use crate::column_values::compact_space::CompactSpaceCompressor;
|
||||
use crate::column_values::u64_based::CodecType;
|
||||
use crate::column_values::U128FastFieldCodecType;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::MonotonicallyMappableToU128;
|
||||
|
||||
/// The normalized header gives some parameters after applying the following
|
||||
/// normalization of the vector:
|
||||
@@ -53,19 +44,9 @@ impl BinarySerializable for U128Header {
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_column<C: ColumnValues>(
|
||||
from_column: C,
|
||||
min_value: u64,
|
||||
gcd: Option<NonZeroU64>,
|
||||
) -> impl ColumnValues {
|
||||
let gcd = gcd.map(|gcd| gcd.get()).unwrap_or(1);
|
||||
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, min_value);
|
||||
monotonic_map_column(from_column, mapping)
|
||||
}
|
||||
|
||||
/// Serializes u128 values with the compact space codec.
|
||||
pub fn serialize_column_values_u128<I: Iterator<Item = u128>>(
|
||||
iterable: &dyn Fn() -> I,
|
||||
pub fn serialize_column_values_u128<T: MonotonicallyMappableToU128>(
|
||||
iterable: &dyn Iterable<T>,
|
||||
num_vals: u32,
|
||||
output: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
@@ -74,9 +55,18 @@ pub fn serialize_column_values_u128<I: Iterator<Item = u128>>(
|
||||
codec_type: U128FastFieldCodecType::CompactSpace,
|
||||
};
|
||||
header.serialize(output)?;
|
||||
let compressor = CompactSpaceCompressor::train_from(iterable(), num_vals);
|
||||
compressor.compress_into(iterable(), output)?;
|
||||
|
||||
let compressor = CompactSpaceCompressor::train_from(
|
||||
iterable
|
||||
.boxed_iter()
|
||||
.map(MonotonicallyMappableToU128::to_u128),
|
||||
num_vals,
|
||||
);
|
||||
compressor.compress_into(
|
||||
iterable
|
||||
.boxed_iter()
|
||||
.map(MonotonicallyMappableToU128::to_u128),
|
||||
output,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -87,6 +77,7 @@ pub mod tests {
|
||||
self, serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
ALL_U64_CODEC_TYPES,
|
||||
};
|
||||
use crate::column_values::CodecType;
|
||||
|
||||
#[test]
|
||||
fn test_serialize_deserialize_u128_header() {
|
||||
@@ -113,8 +104,8 @@ pub mod tests {
|
||||
#[test]
|
||||
fn test_fastfield_bool_size_bitwidth_1() {
|
||||
let mut buffer = Vec::new();
|
||||
serialize_u64_based_column_values(
|
||||
|| [false, true].into_iter(),
|
||||
serialize_u64_based_column_values::<bool>(
|
||||
&&[false, true][..],
|
||||
&ALL_U64_CODEC_TYPES,
|
||||
&mut buffer,
|
||||
)
|
||||
@@ -127,8 +118,8 @@ pub mod tests {
|
||||
#[test]
|
||||
fn test_fastfield_bool_bit_size_bitwidth_0() {
|
||||
let mut buffer = Vec::new();
|
||||
serialize_u64_based_column_values(
|
||||
|| [false, true].into_iter(),
|
||||
serialize_u64_based_column_values::<bool>(
|
||||
&&[false, true][..],
|
||||
&ALL_U64_CODEC_TYPES,
|
||||
&mut buffer,
|
||||
)
|
||||
@@ -141,12 +132,8 @@ pub mod tests {
|
||||
fn test_fastfield_gcd() {
|
||||
let mut buffer = Vec::new();
|
||||
let vals: Vec<u64> = (0..80).map(|val| (val % 7) * 1_000u64).collect();
|
||||
serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&[CodecType::Bitpacked],
|
||||
&mut buffer,
|
||||
)
|
||||
.unwrap();
|
||||
serialize_u64_based_column_values(&&vals[..], &[CodecType::Bitpacked], &mut buffer)
|
||||
.unwrap();
|
||||
// Values are stored over 3 bits.
|
||||
assert_eq!(buffer.len(), 6 + (3 * 80 / 8));
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
|
||||
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
|
||||
}
|
||||
|
||||
let mut line = Line::train(&VecColumn::from(&buffer));
|
||||
let line = Line::train(&VecColumn::from(&buffer));
|
||||
|
||||
assert!(!buffer.is_empty());
|
||||
|
||||
|
||||
@@ -115,22 +115,18 @@ impl CodecType {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64, F, I>(
|
||||
vals: F,
|
||||
pub fn serialize_u64_based_column_values<'a, T: MonotonicallyMappableToU64>(
|
||||
vals: &dyn Iterable<T>,
|
||||
codec_types: &[CodecType],
|
||||
wrt: &mut dyn Write,
|
||||
) -> io::Result<()>
|
||||
where
|
||||
I: Iterator<Item = T>,
|
||||
F: Fn() -> I,
|
||||
{
|
||||
) -> io::Result<()> {
|
||||
let mut stats_collector = StatsCollector::default();
|
||||
let mut estimators: Vec<(CodecType, Box<dyn ColumnCodecEstimator>)> =
|
||||
Vec::with_capacity(codec_types.len());
|
||||
for &codec_type in codec_types {
|
||||
estimators.push((codec_type, codec_type.estimator()));
|
||||
}
|
||||
for val in vals() {
|
||||
for val in vals.boxed_iter() {
|
||||
let val_u64 = val.to_u64();
|
||||
stats_collector.collect(val_u64);
|
||||
for (_, estimator) in &mut estimators {
|
||||
@@ -154,7 +150,7 @@ where
|
||||
best_codec.to_code().serialize(wrt)?;
|
||||
best_codec_estimator.serialize(
|
||||
&stats,
|
||||
&mut vals().map(MonotonicallyMappableToU64::to_u64),
|
||||
&mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64),
|
||||
wrt,
|
||||
)?;
|
||||
Ok(())
|
||||
@@ -178,7 +174,7 @@ pub fn serialize_and_load_u64_based_column_values<T: MonotonicallyMappableToU64>
|
||||
codec_types: &[CodecType],
|
||||
) -> Arc<dyn ColumnValues<T>> {
|
||||
let mut buffer = Vec::new();
|
||||
serialize_u64_based_column_values(|| vals.boxed_iter(), codec_types, &mut buffer).unwrap();
|
||||
serialize_u64_based_column_values(vals, codec_types, &mut buffer).unwrap();
|
||||
load_u64_based_column_values::<T>(OwnedBytes::new(buffer)).unwrap()
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ fn test_serialize_and_load_simple() {
|
||||
let mut buffer = Vec::new();
|
||||
let vals = &[1u64, 2u64, 5u64];
|
||||
serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&&vals[..],
|
||||
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
|
||||
&mut buffer,
|
||||
)
|
||||
@@ -67,9 +67,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
|
||||
);
|
||||
assert_eq!(expected_positions, positions);
|
||||
}
|
||||
dbg!(estimation);
|
||||
dbg!(actual_compression);
|
||||
if actual_compression > 20 {
|
||||
if actual_compression > 1000 {
|
||||
assert!(relative_difference(estimation, actual_compression) < 0.10f32);
|
||||
}
|
||||
Some((
|
||||
@@ -101,12 +99,21 @@ proptest! {
|
||||
create_and_validate::<LinearCodec>(&data, "proptest linearinterpol");
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_proptest_small_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..10)) {
|
||||
create_and_validate::<BlockwiseLinearCodec>(&data, "proptest multilinearinterpol");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_small_blockwise_linear_example() {
|
||||
create_and_validate::<BlockwiseLinearCodec>(
|
||||
&[9223372036854775808, 9223370937344622593],
|
||||
"proptest multilinearinterpol",
|
||||
);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(10))]
|
||||
|
||||
@@ -245,7 +252,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
let mut vals: Vec<i64> = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect();
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
crate::column_values::serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&&vals[..],
|
||||
&[codec_type],
|
||||
&mut buffer,
|
||||
)?;
|
||||
@@ -262,7 +269,7 @@ fn test_fastfield_gcd_i64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
vals.pop();
|
||||
vals.push(1001i64);
|
||||
crate::column_values::serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&&vals[..],
|
||||
&[codec_type],
|
||||
&mut buffer_without_gcd,
|
||||
)?;
|
||||
@@ -288,7 +295,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
let mut vals: Vec<u64> = (1..=num_vals).map(|i| i as u64 * 1000u64).collect();
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
crate::column_values::serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&&vals[..],
|
||||
&[codec_type],
|
||||
&mut buffer,
|
||||
)?;
|
||||
@@ -305,7 +312,7 @@ fn test_fastfield_gcd_u64_with_codec(codec_type: CodecType, num_vals: usize) ->
|
||||
vals.pop();
|
||||
vals.push(1001u64);
|
||||
crate::column_values::serialize_u64_based_column_values(
|
||||
|| vals.iter().cloned(),
|
||||
&&vals[..],
|
||||
&[codec_type],
|
||||
&mut buffer_without_gcd,
|
||||
)?;
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use common::CountingWriter;
|
||||
use itertools::Itertools;
|
||||
use sstable::{SSTable, TermOrdinal};
|
||||
|
||||
use super::term_merger::TermMerger;
|
||||
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
|
||||
use crate::column_values::{serialize_u64_based_column_values, CodecType};
|
||||
use crate::column::serialize_column_mappable_to_u64;
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::BytesColumn;
|
||||
|
||||
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
|
||||
@@ -21,45 +21,38 @@ pub fn merge_bytes_or_str_column(
|
||||
let term_ord_mapping = serialize_merged_dict(bytes_columns, &mut output)?;
|
||||
let dictionary_num_bytes: u32 = output.written_bytes() as u32;
|
||||
let output = output.finish();
|
||||
|
||||
serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?;
|
||||
|
||||
let remapped_term_ordinals_values = RemappedTermOrdinalsValues {
|
||||
bytes_columns,
|
||||
term_ord_mapping: &term_ord_mapping,
|
||||
};
|
||||
serialize_column_mappable_to_u64(column_index, &remapped_term_ordinals_values, output)?;
|
||||
// serialize_bytes_or_str_column(column_index, bytes_columns, &term_ord_mapping, output)?;
|
||||
output.write_all(&dictionary_num_bytes.to_le_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn serialize_bytes_or_str_column(
|
||||
column_index: SerializableColumnIndex<'_>,
|
||||
bytes_columns: &[BytesColumn],
|
||||
term_ord_mapping: &TermOrdinalMapping,
|
||||
output: &mut impl Write,
|
||||
) -> io::Result<()> {
|
||||
let column_index_num_bytes = serialize_column_index(column_index, output)?;
|
||||
struct RemappedTermOrdinalsValues<'a> {
|
||||
bytes_columns: &'a [BytesColumn],
|
||||
term_ord_mapping: &'a TermOrdinalMapping,
|
||||
}
|
||||
|
||||
let column_values = move || {
|
||||
let iter = bytes_columns
|
||||
impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
let iter = self
|
||||
.bytes_columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(segment_ord, byte_column)| {
|
||||
let segment_ord = term_ord_mapping.get_segment(segment_ord);
|
||||
let segment_ord = self.term_ord_mapping.get_segment(segment_ord);
|
||||
byte_column
|
||||
.ords()
|
||||
.values
|
||||
.iter()
|
||||
.map(move |term_ord| segment_ord[term_ord as usize])
|
||||
});
|
||||
iter
|
||||
};
|
||||
|
||||
serialize_u64_based_column_values(
|
||||
column_values,
|
||||
&[CodecType::Bitpacked, CodecType::BlockwiseLinear],
|
||||
output,
|
||||
)?;
|
||||
|
||||
output.write_all(&column_index_num_bytes.to_le_bytes())?;
|
||||
|
||||
Ok(())
|
||||
// TODO see if we can better decompose the mapping / and the stacking
|
||||
Box::new(iter)
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_merged_dict(
|
||||
@@ -89,7 +82,6 @@ fn serialize_merged_dict(
|
||||
current_term_ord += 1;
|
||||
}
|
||||
sstable_builder.finish()?;
|
||||
|
||||
Ok(term_ord_mapping)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ pub struct StackMergeOrder {
|
||||
}
|
||||
|
||||
impl StackMergeOrder {
|
||||
pub fn from_columnars(columnars: &[&ColumnarReader]) -> StackMergeOrder {
|
||||
pub fn stack(columnars: &[&ColumnarReader]) -> StackMergeOrder {
|
||||
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
|
||||
let mut cumulated_row_id = 0;
|
||||
for columnar in columnars {
|
||||
|
||||
@@ -13,6 +13,7 @@ pub use merge_mapping::{MergeRowOrder, StackMergeOrder};
|
||||
|
||||
use super::writer::ColumnarSerializer;
|
||||
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
|
||||
use crate::column_index::stack_column_index;
|
||||
use crate::columnar::column_type::ColumnTypeCategory;
|
||||
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
|
||||
use crate::columnar::writer::CompatibleNumericalTypes;
|
||||
@@ -98,11 +99,7 @@ pub fn merge_column(
|
||||
crate::column_index::stack_column_index(&column_indexes[..], merge_row_order);
|
||||
serialize_column_mappable_to_u128(
|
||||
merged_column_index,
|
||||
&|| {
|
||||
column_values
|
||||
.iter()
|
||||
.flat_map(|column_value| column_value.iter())
|
||||
},
|
||||
&&column_values[..],
|
||||
num_values,
|
||||
wrt,
|
||||
)?;
|
||||
|
||||
@@ -142,7 +142,7 @@ fn test_merge_columnar_numbers() {
|
||||
)]);
|
||||
let mut buffer = Vec::new();
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
|
||||
let stack_merge_order = StackMergeOrder::stack(columnars);
|
||||
crate::columnar::merge_columnar(
|
||||
columnars,
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
@@ -167,7 +167,7 @@ fn test_merge_columnar_texts() {
|
||||
let columnar2 = make_text_columnar_multiple_columns(&[("texts", &[&[], &["b"]])]);
|
||||
let mut buffer = Vec::new();
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
|
||||
let stack_merge_order = StackMergeOrder::stack(columnars);
|
||||
crate::columnar::merge_columnar(
|
||||
columnars,
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
@@ -211,7 +211,7 @@ fn test_merge_columnar_byte() {
|
||||
let columnar2 = make_byte_columnar_multiple_columns(&[("bytes", &[&[], &[b"a"]])]);
|
||||
let mut buffer = Vec::new();
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let stack_merge_order = StackMergeOrder::from_columnars(columnars);
|
||||
let stack_merge_order = StackMergeOrder::stack(columnars);
|
||||
crate::columnar::merge_columnar(
|
||||
columnars,
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
|
||||
@@ -587,7 +587,7 @@ where
|
||||
};
|
||||
crate::column::serialize_column_mappable_to_u128(
|
||||
serializable_column_index,
|
||||
&|| values.iter().copied(),
|
||||
&&values[..],
|
||||
values.len() as u32,
|
||||
&mut wrt,
|
||||
)?;
|
||||
|
||||
@@ -1,61 +1,9 @@
|
||||
use std::iter::Map;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Range;
|
||||
|
||||
pub trait Iterable<T = u64> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
|
||||
}
|
||||
|
||||
struct Mapped<U, Original, Transform> {
|
||||
original_iterable: Original,
|
||||
transform: Transform,
|
||||
input_type: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<U, V, Original, Transform> Iterable<V> for Mapped<U, Original, Transform>
|
||||
where
|
||||
Original: Iterable<U>,
|
||||
Transform: Fn(U) -> V,
|
||||
{
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = V> + '_> {
|
||||
Box::new(self.original_iterable.boxed_iter().map(&self.transform))
|
||||
}
|
||||
}
|
||||
|
||||
impl<U> Iterable<U> for &dyn Iterable<U> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = U> + '_> {
|
||||
(*self).boxed_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T> Iterable<T> for F
|
||||
where F: Fn() -> Box<dyn Iterator<Item = T>>
|
||||
{
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
self()
|
||||
}
|
||||
}
|
||||
|
||||
// impl<F, I, T> Iterable<T> for F
|
||||
// where
|
||||
// I: Iterator<Item = T>,
|
||||
// F: Fn() -> I,
|
||||
//{
|
||||
// fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
// Box::new(self())
|
||||
//}
|
||||
|
||||
pub fn map_iterable<U, V, F, I>(
|
||||
original_iterable: impl Fn() -> I,
|
||||
transform: F,
|
||||
) -> impl Fn() -> std::iter::Map<I, F>
|
||||
where
|
||||
F: Fn(U) -> V + Clone,
|
||||
I: Iterator<Item = U>,
|
||||
{
|
||||
move || original_iterable().map(transform.clone())
|
||||
}
|
||||
|
||||
impl<'a, T: Copy> Iterable<T> for &'a [T] {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
Box::new(self.iter().copied())
|
||||
|
||||
@@ -26,7 +26,6 @@ pub use columnar::{
|
||||
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
|
||||
MergeRowOrder, StackMergeOrder,
|
||||
};
|
||||
pub(crate) use iterable::{map_iterable, Iterable};
|
||||
use sstable::VoidSSTable;
|
||||
pub use value::{NumericalType, NumericalValue};
|
||||
|
||||
|
||||
@@ -37,6 +37,11 @@ impl SegmentDocIdMapping {
|
||||
/// This flags means the segments are simply stacked in the order of their ordinal.
|
||||
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
|
||||
///
|
||||
/// The different segment may present some deletes, in which case it is expressed by skipping a
|
||||
/// `DocId`. [(0, 1), (0, 3)] <--- here doc_id=0 and doc_id=1 have been deleted
|
||||
///
|
||||
/// Being trivial is equivalent to having the `new_doc_id_to_old_doc_addr` array sorted.
|
||||
///
|
||||
/// This allows for some optimization.
|
||||
pub(crate) fn is_trivial(&self) -> bool {
|
||||
self.is_trivial
|
||||
|
||||
@@ -86,29 +86,6 @@ pub struct IndexMerger {
|
||||
max_doc: u32,
|
||||
}
|
||||
|
||||
struct TermOrdinalMapping {
|
||||
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
|
||||
}
|
||||
|
||||
impl TermOrdinalMapping {
|
||||
fn new(max_term_ords: Vec<TermOrdinal>) -> TermOrdinalMapping {
|
||||
TermOrdinalMapping {
|
||||
per_segment_new_term_ordinals: max_term_ords
|
||||
.into_iter()
|
||||
.map(|max_term_ord| vec![TermOrdinal::default(); max_term_ord as usize])
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn register_from_to(&mut self, segment_ord: usize, from_ord: TermOrdinal, to_ord: TermOrdinal) {
|
||||
self.per_segment_new_term_ordinals[segment_ord][from_ord as usize] = to_ord;
|
||||
}
|
||||
|
||||
fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] {
|
||||
&(self.per_segment_new_term_ordinals[segment_ord])[..]
|
||||
}
|
||||
}
|
||||
|
||||
struct DeltaComputer {
|
||||
buffer: Vec<u32>,
|
||||
}
|
||||
@@ -245,7 +222,6 @@ impl IndexMerger {
|
||||
fn write_fast_fields(
|
||||
&self,
|
||||
fast_field_wrt: &mut WritePtr,
|
||||
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write-fast-fields");
|
||||
@@ -257,59 +233,8 @@ impl IndexMerger {
|
||||
if !doc_id_mapping.is_trivial() {
|
||||
todo!()
|
||||
}
|
||||
let merge_row_order = MergeRowOrder::Stack(StackMergeOrder::from_columnars(&columnars[..]));
|
||||
let merge_row_order = MergeRowOrder::Stack(StackMergeOrder::stack(&columnars[..]));
|
||||
columnar::merge_columnar(&columnars[..], merge_row_order, fast_field_wrt)?;
|
||||
// for (field, field_entry) in self.schema.fields() {
|
||||
// let field_type = field_entry.field_type();
|
||||
// match field_type {
|
||||
// FieldType::Facet(_) | FieldType::Str(_) if field_type.is_fast() => {
|
||||
// let term_ordinal_mapping = term_ord_mappings.remove(&field).expect(
|
||||
// "Logic Error in Tantivy (Please report). Facet field should have required \
|
||||
// a`term_ordinal_mapping`.",
|
||||
// );
|
||||
// self.write_term_id_fast_field(
|
||||
// field,
|
||||
// &term_ordinal_mapping,
|
||||
// fast_field_serializer,
|
||||
// doc_id_mapping,
|
||||
// )?;
|
||||
// }
|
||||
// FieldType::U64(ref options)
|
||||
// | FieldType::I64(ref options)
|
||||
// | FieldType::F64(ref options)
|
||||
// | FieldType::Bool(ref options) => {
|
||||
// todo!()
|
||||
// }
|
||||
// FieldType::Date(ref options) => {
|
||||
// if options.is_fast() {
|
||||
// todo!();
|
||||
// }
|
||||
// Some(Cardinality::SingleValue) => {
|
||||
// self.write_single_fast_field(field, fast_field_serializer, doc_id_mapping)?;
|
||||
// }
|
||||
// Some(Cardinality::MultiValues) => {
|
||||
// self.write_multi_fast_field(field, fast_field_serializer, doc_id_mapping)?;
|
||||
// }
|
||||
// None => {}
|
||||
// },
|
||||
// FieldType::Bytes(byte_options) => {
|
||||
// if byte_options.is_fast() {
|
||||
// self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
|
||||
// }
|
||||
// }
|
||||
// FieldType::IpAddr(options) => {
|
||||
// if options.is_fast() {
|
||||
// todo!();
|
||||
// }
|
||||
// },
|
||||
//
|
||||
// FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => {
|
||||
// We don't handle json fast field for the moment
|
||||
// They can be implemented using what is done
|
||||
// for facets in the future
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -374,7 +299,7 @@ impl IndexMerger {
|
||||
/// doc_id.
|
||||
/// ReaderWithOrdinal will include the ordinal position of the
|
||||
/// reader in self.readers.
|
||||
pub(crate) fn generate_doc_id_mapping(
|
||||
pub(crate) fn generate_doc_id_mapping_with_sort_by_field(
|
||||
&self,
|
||||
sort_by_field: &IndexSortByField,
|
||||
) -> crate::Result<SegmentDocIdMapping> {
|
||||
@@ -454,7 +379,7 @@ impl IndexMerger {
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<Option<TermOrdinalMapping>> {
|
||||
) -> crate::Result<()> {
|
||||
debug_time!("write-postings-for-field");
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
@@ -474,14 +399,6 @@ impl IndexMerger {
|
||||
max_term_ords.push(terms.num_terms() as u64);
|
||||
}
|
||||
|
||||
let mut term_ord_mapping_opt = match field_type {
|
||||
FieldType::Facet(_) => Some(TermOrdinalMapping::new(max_term_ords)),
|
||||
FieldType::Str(options) if options.is_fast() => {
|
||||
Some(TermOrdinalMapping::new(max_term_ords))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut merged_terms = TermMerger::new(field_term_streams);
|
||||
|
||||
// map from segment doc ids to the resulting merged segment doc id.
|
||||
@@ -566,12 +483,6 @@ impl IndexMerger {
|
||||
|
||||
let to_term_ord = field_serializer.new_term(term_bytes, total_doc_freq)?;
|
||||
|
||||
if let Some(ref mut term_ord_mapping) = term_ord_mapping_opt {
|
||||
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
|
||||
term_ord_mapping.register_from_to(segment_ord, from_term_ord, to_term_ord);
|
||||
}
|
||||
}
|
||||
|
||||
// We can now serialize this postings, by pushing each document to the
|
||||
// postings serializer.
|
||||
for (segment_ord, mut segment_postings) in
|
||||
@@ -622,7 +533,7 @@ impl IndexMerger {
|
||||
field_serializer.close_term()?;
|
||||
}
|
||||
field_serializer.close()?;
|
||||
Ok(term_ord_mapping_opt)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_postings(
|
||||
@@ -630,23 +541,20 @@ impl IndexMerger {
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
doc_id_mapping: &SegmentDocIdMapping,
|
||||
) -> crate::Result<HashMap<Field, TermOrdinalMapping>> {
|
||||
let mut term_ordinal_mappings = HashMap::new();
|
||||
) -> crate::Result<()> {
|
||||
for (field, field_entry) in self.schema.fields() {
|
||||
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
|
||||
if field_entry.is_indexed() {
|
||||
if let Some(term_ordinal_mapping) = self.write_postings_for_field(
|
||||
self.write_postings_for_field(
|
||||
field,
|
||||
field_entry.field_type(),
|
||||
serializer,
|
||||
fieldnorm_reader,
|
||||
doc_id_mapping,
|
||||
)? {
|
||||
term_ordinal_mappings.insert(field, term_ordinal_mapping);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(term_ordinal_mappings)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_storable_fields(
|
||||
@@ -731,7 +639,7 @@ impl IndexMerger {
|
||||
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
|
||||
self.get_doc_id_from_concatenated_data()?
|
||||
} else {
|
||||
self.generate_doc_id_mapping(sort_by_field)?
|
||||
self.generate_doc_id_mapping_with_sort_by_field(sort_by_field)?
|
||||
}
|
||||
} else {
|
||||
self.get_doc_id_from_concatenated_data()?
|
||||
@@ -745,17 +653,13 @@ impl IndexMerger {
|
||||
.segment()
|
||||
.open_read(SegmentComponent::FieldNorms)?;
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
let term_ord_mappings = self.write_postings(
|
||||
self.write_postings(
|
||||
serializer.get_postings_serializer(),
|
||||
fieldnorm_readers,
|
||||
&doc_id_mapping,
|
||||
)?;
|
||||
debug!("write-fastfields");
|
||||
self.write_fast_fields(
|
||||
serializer.get_fast_field_write(),
|
||||
term_ord_mappings,
|
||||
&doc_id_mapping,
|
||||
)?;
|
||||
self.write_fast_fields(serializer.get_fast_field_write(), &doc_id_mapping)?;
|
||||
debug!("write-storagefields");
|
||||
self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?;
|
||||
debug!("close-serializer");
|
||||
|
||||
Reference in New Issue
Block a user