Compare commits

..

2 Commits

Author SHA1 Message Date
Pascal Seitz
dbd3aed24a improve log levels 2022-09-29 13:25:14 +08:00
Pascal Seitz
7707b8a6e1 add debug_time for ff serialization 2022-09-24 19:48:07 +08:00
11 changed files with 110 additions and 115 deletions

View File

@@ -17,7 +17,7 @@ rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
log = "0.4"
itertools = { version = "0.10.3" }
measure_time = { version="0.8.2", optional=true}
measure_time = { version="0.8.2" }
[dev-dependencies]
more-asserts = "0.3.0"
@@ -25,7 +25,7 @@ proptest = "1.0.0"
rand = "0.8.3"
[features]
bin = ["prettytable-rs", "rand", "measure_time"]
bin = ["prettytable-rs", "rand"]
default = ["bin"]
unstable = []

View File

@@ -3,7 +3,6 @@ use std::io::{self, Write};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType};
@@ -76,7 +75,7 @@ impl FastFieldCodec for BitpackedCodec {
Ok(())
}
fn estimate(column: &EstimateColumn) -> Option<f32> {
fn estimate(column: &dyn Column) -> Option<f32> {
let num_bits = compute_num_bits(column.max_value());
let num_bits_uncompressed = 64;
Some(num_bits as f32 / num_bits_uncompressed as f32)

View File

@@ -5,7 +5,6 @@ use common::{BinarySerializable, CountingWriter, DeserializeFrom};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::line::Line;
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn};
@@ -72,7 +71,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
}
// Estimate first_chunk and extrapolate
fn estimate(column: &EstimateColumn) -> Option<f32> {
fn estimate(column: &dyn crate::Column) -> Option<f32> {
if column.num_vals() < 10 * CHUNK_SIZE as u64 {
return None;
}

View File

@@ -137,57 +137,6 @@ where V: AsRef<[T]> + ?Sized
}
}
// Creates a view over a Column with a limited number of vals. Stats like min max are unchanged
pub struct EstimateColumn<'a> {
column: &'a dyn Column,
num_vals: u64,
}
impl<'a> EstimateColumn<'a> {
pub(crate) fn new(column: &'a dyn Column) -> Self {
let limit_num_vals = column.num_vals().min(100_000);
Self {
column,
num_vals: limit_num_vals,
}
}
}
impl<'a> Column for EstimateColumn<'a> {
fn get_val(&self, idx: u64) -> u64 {
(*self.column).get_val(idx)
}
fn min_value(&self) -> u64 {
(*self.column).min_value()
}
fn max_value(&self) -> u64 {
(*self.column).max_value()
}
fn num_vals(&self) -> u64 {
self.num_vals
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new((*self.column).iter().take(self.num_vals as usize))
}
fn get_range(&self, start: u64, output: &mut [u64]) {
(*self.column).get_range(start, output)
}
}
impl<'a> From<&'a dyn Column> for EstimateColumn<'a> {
fn from(column: &'a dyn Column) -> Self {
let limit_num_vals = column.num_vals().min(100_000);
Self {
column,
num_vals: limit_num_vals,
}
}
}
struct MonotonicMappingColumn<C, T, Input> {
from_column: C,
monotonic_mapping: T,

View File

@@ -11,7 +11,6 @@ use std::io;
use std::io::Write;
use std::sync::Arc;
use column::EstimateColumn;
use common::BinarySerializable;
use compact_space::CompactSpaceDecompressor;
use ownedbytes::OwnedBytes;
@@ -133,7 +132,7 @@ trait FastFieldCodec: 'static {
///
/// It could make sense to also return a value representing
/// computational complexity.
fn estimate(column: &EstimateColumn) -> Option<f32>;
fn estimate(column: &dyn Column) -> Option<f32>;
}
pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [
@@ -150,7 +149,6 @@ mod tests {
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::column::EstimateColumn;
use crate::linear::LinearCodec;
use crate::serialize::Header;
@@ -161,9 +159,7 @@ mod tests {
let col = &VecColumn::from(data);
let header = Header::compute_header(col, &[Codec::CODEC_TYPE])?;
let normalized_col = header.normalize_column(col);
let limited_column = EstimateColumn::new(&normalized_col);
let estimation = Codec::estimate(&limited_column)?;
let estimation = Codec::estimate(&normalized_col)?;
let mut out = Vec::new();
let col = VecColumn::from(data);
@@ -284,16 +280,14 @@ mod tests {
let data = (10..=20000_u64).collect::<Vec<_>>();
let data: VecColumn = data.as_slice().into();
let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
assert_le!(linear_interpol_estimation, 0.01);
let multi_linear_interpol_estimation =
BlockwiseLinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let multi_linear_interpol_estimation = BlockwiseLinearCodec::estimate(&data).unwrap();
assert_le!(multi_linear_interpol_estimation, 0.2);
assert_lt!(linear_interpol_estimation, multi_linear_interpol_estimation);
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
assert_lt!(linear_interpol_estimation, bitpacked_estimation);
}
#[test]
@@ -301,20 +295,18 @@ mod tests {
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];
let data: VecColumn = data.into();
let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
assert_le!(linear_interpol_estimation, 0.34);
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
}
#[test]
fn estimation_prefer_bitpacked() {
let data = VecColumn::from(&[10, 10, 10, 10]);
let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
assert_lt!(bitpacked_estimation, linear_interpol_estimation);
}
@@ -326,11 +318,10 @@ mod tests {
// in this case the linear interpolation can't in fact not be worse than bitpacking,
// but the estimator adds some threshold, which leads to estimated worse behavior
let linear_interpol_estimation =
LinearCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
assert_le!(linear_interpol_estimation, 0.35);
let bitpacked_estimation = BitpackedCodec::estimate(&EstimateColumn::new(&data)).unwrap();
let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap();
assert_le!(bitpacked_estimation, 0.32);
assert_le!(bitpacked_estimation, linear_interpol_estimation);
}

View File

@@ -67,11 +67,19 @@ impl Line {
self.intercept.wrapping_add(linear_part)
}
// Same as train, but the intercept is only estimated from provided sample positions
pub fn estimate(ys: &dyn Column, sample_positions: &[u64]) -> Self {
Self::train_from(
ys,
sample_positions
.iter()
.cloned()
.map(|pos| (pos, ys.get_val(pos))),
)
}
// Intercept is only computed from provided positions
pub fn train_from(
ys: &dyn Column,
positions_and_values: impl Iterator<Item = (u64, u64)>,
) -> Self {
fn train_from(ys: &dyn Column, positions_and_values: impl Iterator<Item = (u64, u64)>) -> Self {
let num_vals = if let Some(num_vals) = NonZeroU64::new(ys.num_vals() - 1) {
num_vals
} else {

View File

@@ -4,7 +4,6 @@ use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
use crate::column::EstimateColumn;
use crate::line::Line;
use crate::serialize::NormalizedHeader;
use crate::{Column, FastFieldCodec, FastFieldCodecType};
@@ -122,23 +121,23 @@ impl FastFieldCodec for LinearCodec {
/// where the local maxima for the deviation of the calculated value are and
/// the offset to shift all values to >=0 is also unknown.
#[allow(clippy::question_mark)]
fn estimate(column: &EstimateColumn) -> Option<f32> {
fn estimate(column: &dyn Column) -> Option<f32> {
if column.num_vals() < 3 {
return None; // disable compressor for this case
}
// let's sample at 0%, 5%, 10% .. 95%, 100%
let num_vals = column.num_vals() as f32 / 100.0;
let sample_positions_and_values = (0..20)
let sample_positions = (0..20)
.map(|pos| (num_vals * pos as f32 * 5.0) as u64)
.map(|pos| (pos, column.get_val(pos)))
.collect::<Vec<_>>();
let line = { Line::train_from(column, sample_positions_and_values.iter().cloned()) };
let line = Line::estimate(column, &sample_positions);
let estimated_bit_width = sample_positions_and_values
let estimated_bit_width = sample_positions
.into_iter()
.map(|(pos, actual_value)| {
.map(|pos| {
let actual_value = column.get_val(pos);
let interpolated_val = line.eval(pos as u64);
actual_value.wrapping_sub(interpolated_val)
})

View File

@@ -23,12 +23,12 @@ use std::sync::Arc;
use common::{BinarySerializable, VInt};
use fastdivide::DividerU64;
use log::warn;
use log::{trace, warn};
use measure_time::trace_time;
use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::column::EstimateColumn;
use crate::compact_space::CompactSpaceCompressor;
use crate::linear::LinearCodec;
use crate::{
@@ -126,6 +126,23 @@ impl BinarySerializable for Header {
}
}
pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
}
}
pub fn serialize_u128(
typed_column: impl Column<u128>,
output: &mut impl io::Write,
@@ -161,31 +178,13 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
Ok(())
}
pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
) -> Option<f32> {
let column = monotonic_map_column(typed_column, T::to_u64);
let min_value = column.min_value();
let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value))
.filter(|gcd| gcd.get() > 1u64);
let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64));
let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value));
let estimate_column = EstimateColumn::new(&normalized_column);
match codec_type {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&estimate_column),
FastFieldCodecType::Linear => LinearCodec::estimate(&estimate_column),
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&estimate_column),
}
}
fn detect_codec(
column: impl Column<u64>,
codecs: &[FastFieldCodecType],
) -> Option<FastFieldCodecType> {
let column: EstimateColumn = EstimateColumn::new(&column);
let mut estimations = Vec::new();
for &codec in codecs {
trace_time!("estimate time for codec: {:?}", codec);
let estimation_opt = match codec {
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&column),
FastFieldCodecType::Linear => LinearCodec::estimate(&column),
@@ -205,6 +204,7 @@ fn detect_codec(
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right));
trace!("Chosen Codec {:?}", estimations.first()?.1);
Some(estimations.first()?.1)
}
@@ -213,6 +213,12 @@ fn serialize_given_codec(
codec_type: FastFieldCodecType,
output: &mut impl io::Write,
) -> io::Result<()> {
trace_time!(
"Serialize time for codec: {:?}, num_vals {}",
codec_type,
column.num_vals()
);
match codec_type {
FastFieldCodecType::Bitpacked => {
BitpackedCodec::serialize(&column, output)?;

View File

@@ -3,6 +3,7 @@ use std::sync::Mutex;
use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn};
use fnv::FnvHashMap;
use measure_time::{debug_time, trace_time};
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType};
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -146,6 +147,13 @@ impl MultiValuedFastFieldWriter {
{
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
trace_time!(
"segment-serialize-multi-fast-field-idx, num_vals {}, field_id {:?}",
col.num_vals(),
self.field()
);
if let Some(doc_id_map) = doc_id_map {
let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map);
serializer.create_auto_detect_u64_fast_field_with_idx(
@@ -158,6 +166,12 @@ impl MultiValuedFastFieldWriter {
}
}
{
trace_time!(
"segment-serialize-multi-fast-field-values, num_vals {}, field_id {:?}",
self.vals.len(),
self.field()
);
// Writing the values themselves.
// TODO FIXME: Use less memory.
let mut values: Vec<u64> = Vec::new();

View File

@@ -4,6 +4,7 @@ use std::io;
use common;
use fastfield_codecs::{Column, MonotonicallyMappableToU64};
use fnv::FnvHashMap;
use measure_time::{debug_time, trace_time};
use tantivy_bitpacker::BlockedBitpacker;
use super::multivalued::MultiValuedFastFieldWriter;
@@ -215,6 +216,7 @@ impl FastFieldsWriter {
mapping: &HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
debug_time!("segment-serialize-all-fast-fields",);
for field_writer in self.term_id_writers {
let field = field_writer.field();
field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?;
@@ -367,6 +369,11 @@ impl IntFastFieldWriter {
num_vals: self.val_count as u64,
};
trace_time!(
"segment-serialize-single-value-field, field_id {:?}",
self.field()
);
serializer.create_auto_detect_u64_fast_field(self.field, fastfield_accessor)?;
Ok(())

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use fastfield_codecs::VecColumn;
use itertools::Itertools;
use measure_time::debug_time;
use measure_time::{debug_time, trace_time};
use crate::core::{Segment, SegmentReader};
use crate::docset::{DocSet, TERMINATED};
@@ -250,7 +250,11 @@ impl IndexMerger {
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-fast-fields");
debug_time!(
"merge-all-fast-fields, num_segments {}, num docs new segment {}",
self.readers.len(),
doc_id_mapping.len()
);
for (field, field_entry) in self.schema.fields() {
let field_type = field_entry.field_type();
@@ -311,6 +315,12 @@ impl IndexMerger {
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
trace_time!(
"merge-single-fast-field, num_vals {}, num_segments {}, field_id {:?}",
fast_field_accessor.num_vals(),
self.readers.len(),
field
);
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;
Ok(())
@@ -458,6 +468,12 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Vec<u64>> {
trace_time!(
"merge-multi-fast-field-idx, num_segments {}, field_id {:?}",
self.readers.len(),
field
);
let reader_ordinal_and_field_accessors = self
.readers
.iter()
@@ -488,7 +504,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-term-id-fast-field");
trace_time!("write-term-id-fast-field");
// Multifastfield consists of 2 fastfields.
// The first serves as an index into the second one and is strictly increasing.
@@ -571,6 +587,13 @@ impl IndexMerger {
let fastfield_accessor =
SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field);
trace_time!(
"merge-multi-fast-field-values, num_vals {}, num_segments {}, field_id {:?}",
fastfield_accessor.num_vals(),
self.readers.len(),
field
);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
fastfield_accessor,
@@ -624,7 +647,7 @@ impl IndexMerger {
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Option<TermOrdinalMapping>> {
debug_time!("write-postings-for-field");
debug_time!("write-postings-for-field {:?}", indexed_field);
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
@@ -827,7 +850,7 @@ impl IndexMerger {
debug!("write-storable-field");
if !doc_id_mapping.is_trivial() {
debug!("non-trivial-doc-id-mapping");
debug!("non-trivial-doc-id-mapping (index is sorted)");
let store_readers: Vec<_> = self
.readers
@@ -855,7 +878,7 @@ impl IndexMerger {
}
}
} else {
debug!("trivial-doc-id-mapping");
debug!("trivial-doc-id-mapping (index is not sorted)");
for reader in &self.readers {
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()