refactor, fix api

refactor
fix clippy
fix docs
remove unused code
fix bytesfield index api flaw
This commit is contained in:
Pascal Seitz
2022-09-07 18:28:17 +08:00
parent 4d634d61ff
commit 29d56111de
9 changed files with 148 additions and 163 deletions

View File

@@ -6,9 +6,6 @@ extern crate test;
mod tests {
use std::sync::Arc;
use fastfield_codecs::bitpacked::BitpackedCodec;
use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec;
use fastfield_codecs::linear::LinearCodec;
use fastfield_codecs::*;
fn get_data() -> Vec<u64> {

View File

@@ -176,7 +176,7 @@ where
T: Copy + Ord + Default,
{
fn min_max(&self) -> (T, T) {
if let Some((min, max)) = self.min_max_cache.lock().unwrap().clone() {
if let Some((min, max)) = *self.min_max_cache.lock().unwrap() {
return (min, max);
}
let (min, max) =

View File

@@ -9,21 +9,28 @@ extern crate test;
use std::io;
use std::io::Write;
use std::sync::Arc;
use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use serialize::Header;
mod bitpacked;
mod blockwise_linear;
pub(crate) mod line;
mod linear;
mod monotonic_mapping;
mod column;
mod gcd;
mod serialize;
pub use self::bitpacked::BitpackedCodec;
pub use self::blockwise_linear::BlockwiseLinearCodec;
pub use self::column::{monotonic_map_column, Column, VecColumn};
pub use self::serialize::{estimate, open, serialize, serialize_and_load, NormalizedHeader};
pub use self::linear::LinearCodec;
pub use self::monotonic_mapping::MonotonicallyMappableToU64;
pub use self::serialize::{estimate, serialize, serialize_and_load, NormalizedHeader};
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
#[repr(u8)]
@@ -61,70 +68,39 @@ impl FastFieldCodecType {
}
}
pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy {
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(self) -> u64;
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
}
impl MonotonicallyMappableToU64 for u64 {
fn to_u64(self) -> u64 {
self
}
fn from_u64(val: u64) -> Self {
val
}
}
impl MonotonicallyMappableToU64 for i64 {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self)
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
}
impl MonotonicallyMappableToU64 for bool {
#[inline(always)]
fn to_u64(self) -> u64 {
if self {
1
} else {
0
/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> {
let header = Header::deserialize(&mut bytes)?;
match header.codec_type {
FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header),
FastFieldCodecType::Linear => open_specific_codec::<LinearCodec, _>(bytes, &header),
FastFieldCodecType::BlockwiseLinear => {
open_specific_codec::<BlockwiseLinearCodec, _>(bytes, &header)
}
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
val > 0
}
}
impl MonotonicallyMappableToU64 for f64 {
fn to_u64(self) -> u64 {
common::f64_to_u64(self)
}
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
header: &Header,
) -> io::Result<Arc<dyn Column<Item>>> {
let normalized_header = header.normalized();
let reader = C::open_from_bytes(bytes, normalized_header)?;
let min_value = header.min_value;
if let Some(gcd) = header.gcd {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get());
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
} else {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val);
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
}
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
trait FastFieldCodec: 'static {
pub trait FastFieldCodec: 'static {
/// A codex needs to provide a unique name and id, which is
/// used for debugging and de/serialization.
const CODEC_TYPE: FastFieldCodecType;

View File

@@ -0,0 +1,60 @@
pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy {
/// Converts a value to u64.
///
/// Internally all fast field values are encoded as u64.
fn to_u64(self) -> u64;
/// Converts a value from u64
///
/// Internally all fast field values are encoded as u64.
/// **Note: To be used for converting encoded Term, Posting values.**
fn from_u64(val: u64) -> Self;
}
impl MonotonicallyMappableToU64 for u64 {
fn to_u64(self) -> u64 {
self
}
fn from_u64(val: u64) -> Self {
val
}
}
impl MonotonicallyMappableToU64 for i64 {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self)
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
common::u64_to_i64(val)
}
}
impl MonotonicallyMappableToU64 for bool {
#[inline(always)]
fn to_u64(self) -> u64 {
if self {
1
} else {
0
}
}
#[inline(always)]
fn from_u64(val: u64) -> Self {
val > 0
}
}
impl MonotonicallyMappableToU64 for f64 {
fn to_u64(self) -> u64 {
common::f64_to_u64(self)
}
fn from_u64(val: u64) -> Self {
common::u64_to_f64(val)
}
}

View File

@@ -47,11 +47,11 @@ pub struct NormalizedHeader {
#[derive(Debug, Copy, Clone)]
pub(crate) struct Header {
num_vals: u64,
min_value: u64,
max_value: u64,
gcd: Option<NonZeroU64>,
codec_type: FastFieldCodecType,
pub num_vals: u64,
pub min_value: u64,
pub max_value: u64,
pub gcd: Option<NonZeroU64>,
pub codec_type: FastFieldCodecType,
}
impl Header {
@@ -124,36 +124,6 @@ impl BinarySerializable for Header {
}
}
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> {
let header = Header::deserialize(&mut bytes)?;
match header.codec_type {
FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header),
FastFieldCodecType::Linear => open_specific_codec::<LinearCodec, _>(bytes, &header),
FastFieldCodecType::BlockwiseLinear => {
open_specific_codec::<BlockwiseLinearCodec, _>(bytes, &header)
}
}
}
fn open_specific_codec<C: FastFieldCodec, Item: MonotonicallyMappableToU64>(
bytes: OwnedBytes,
header: &Header,
) -> io::Result<Arc<dyn Column<Item>>> {
let normalized_header = header.normalized();
let reader = C::open_from_bytes(bytes, normalized_header)?;
let min_value = header.min_value;
if let Some(gcd) = header.gcd {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get());
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
} else {
let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val);
Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping)))
}
}
pub fn estimate<T: MonotonicallyMappableToU64>(
typed_column: impl Column<T>,
codec_type: FastFieldCodecType,
@@ -217,8 +187,7 @@ fn detect_codec(
// removing nan values for codecs with broken calculations, and max values which disables
// codecs
estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX);
estimations
.sort_by(|(score_left, _), (score_right, _)| score_left.partial_cmp(&score_right).unwrap());
estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right));
Some(estimations.first()?.1)
}

View File

@@ -112,7 +112,6 @@ impl BytesFastFieldWriter {
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
// writing the offset index
// TODO FIXME No need to double the memory.
{
self.doc_index.push(self.vals.len() as u64);
let col = VecColumn::from(&self.doc_index[..]);
@@ -128,7 +127,7 @@ impl BytesFastFieldWriter {
}
}
// writing the values themselves
let mut value_serializer = serializer.new_bytes_fast_field_with_idx(self.field, 1);
let mut value_serializer = serializer.new_bytes_fast_field(self.field);
// the else could be removed, but this is faster (difference not benchmarked)
if let Some(doc_id_map) = doc_id_map {
for vals in self.get_ordered_values(Some(doc_id_map)) {

View File

@@ -142,7 +142,7 @@ impl MultiValuedFastFieldWriter {
pub fn serialize(
mut self,
serializer: &mut CompositeFastFieldSerializer,
mapping_opt: Option<&FnvHashMap<UnorderedTermId, TermOrdinal>>,
term_mapping_opt: Option<&FnvHashMap<UnorderedTermId, TermOrdinal>>,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
{
@@ -163,7 +163,7 @@ impl MultiValuedFastFieldWriter {
// Writing the values themselves.
// TODO FIXME: Use less memory.
let mut values: Vec<u64> = Vec::new();
if let Some(mapping) = mapping_opt {
if let Some(term_mapping) = term_mapping_opt {
if self.fast_field_type.is_facet() {
let mut doc_vals: Vec<u64> = Vec::with_capacity(100);
for vals in self.get_ordered_values(doc_id_map) {
@@ -171,7 +171,7 @@ impl MultiValuedFastFieldWriter {
doc_vals.clear();
let remapped_vals = vals
.iter()
.map(|val| *mapping.get(val).expect("Missing term ordinal"));
.map(|val| *term_mapping.get(val).expect("Missing term ordinal"));
doc_vals.extend(remapped_vals);
doc_vals.sort_unstable();
for &val in &doc_vals {
@@ -182,7 +182,7 @@ impl MultiValuedFastFieldWriter {
for vals in self.get_ordered_values(doc_id_map) {
let remapped_vals = vals
.iter()
.map(|val| *mapping.get(val).expect("Missing term ordinal"));
.map(|val| *term_mapping.get(val).expect("Missing term ordinal"));
for val in remapped_vals {
values.push(val);
}
@@ -214,6 +214,19 @@ struct MultivalueStartIndexRandomSeeker<'a, C: Column> {
seek_head: MultivalueStartIndexIter<'a, C>,
seek_next_id: u64,
}
impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}
}
}
impl<'a, C: Column> MultivalueStartIndex<'a, C> {
pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
@@ -222,20 +235,12 @@ impl<'a, C: Column> MultivalueStartIndex<'a, C> {
column,
doc_id_map,
min_max_opt: Mutex::default(),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker {
seek_head: MultivalueStartIndexIter {
column,
doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
}),
random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)),
}
}
fn minmax(&self) -> (u64, u64) {
if let Some((min, max)) = self.min_max_opt.lock().unwrap().clone() {
if let Some((min, max)) = *self.min_max_opt.lock().unwrap() {
return (min, max);
}
let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64));
@@ -247,15 +252,8 @@ impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
fn get_val(&self, idx: u64) -> u64 {
let mut random_seeker_lock = self.random_seeker.lock().unwrap();
if random_seeker_lock.seek_next_id > idx {
*random_seeker_lock = MultivalueStartIndexRandomSeeker {
seek_head: MultivalueStartIndexIter {
column: self.column,
doc_id_map: self.doc_id_map,
new_doc_id: 0,
offset: 0u64,
},
seek_next_id: 0u64,
};
*random_seeker_lock =
MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map);
}
let to_skip = idx - random_seeker_lock.seek_next_id;
random_seeker_lock.seek_next_id = idx + 1;
@@ -275,12 +273,7 @@ impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> {
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(MultivalueStartIndexIter {
column: &self.column,
doc_id_map: self.doc_id_map,
new_doc_id: 0,
offset: 0,
})
Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map))
}
}
@@ -291,6 +284,17 @@ struct MultivalueStartIndexIter<'a, C: Column> {
pub offset: u64,
}
impl<'a, C: Column> MultivalueStartIndexIter<'a, C> {
fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self {
Self {
column,
doc_id_map,
new_doc_id: 0,
offset: 0,
}
}
}
impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> {
type Item = u64;

View File

@@ -1,6 +1,6 @@
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter};
use common::CountingWriter;
pub use fastfield_codecs::{Column, FastFieldStats};
use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES};
@@ -16,16 +16,14 @@ use crate::schema::Field;
/// the serializer.
/// The serializer expects to receive the following calls.
///
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `create_auto_detect_u64_fast_field(...)`
/// * `create_auto_detect_u64_fast_field(...)`
/// * ...
/// * `close_field()`
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `let bytes_fastfield = new_bytes_fast_field(...)`
/// * `bytes_fastfield.write_all(...)`
/// * `bytes_fastfield.write_all(...)`
/// * `bytes_fastfield.flush()`
/// * ...
/// * `close_field()`
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
@@ -33,17 +31,16 @@ pub struct CompositeFastFieldSerializer {
}
impl CompositeFastFieldSerializer {
/// Constructor
/// New fast field serializer with all codec types
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
Self::from_write_with_codec(write, &ALL_CODEC_TYPES)
}
/// Constructor
/// New fast field serializer with allowed codec types
pub fn from_write_with_codec(
write: WritePtr,
codec_types: &[FastFieldCodecType],
) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer {
composite_write,
@@ -61,16 +58,6 @@ impl CompositeFastFieldSerializer {
self.create_auto_detect_u64_fast_field_with_idx(field, fastfield_accessor, 0)
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn write_header<W: Write>(
field_write: &mut W,
codec_type: FastFieldCodecType,
) -> io::Result<()> {
codec_type.to_code().serialize(field_write)?;
Ok(())
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx<T: MonotonicallyMappableToU64>(
@@ -84,13 +71,14 @@ impl CompositeFastFieldSerializer {
Ok(())
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
/// Start serializing a new [u8] fast field.
///
/// The bytes will be stored as is, no compression will be applied.
pub fn new_bytes_fast_field(
&mut self,
field: Field,
idx: usize,
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
let field_write = self.composite_write.for_field_with_idx(field, 1);
FastBytesFieldSerializer { write: field_write }
}

View File

@@ -130,14 +130,6 @@ impl TermOrdinalMapping {
fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] {
&(self.per_segment_new_term_ordinals[segment_ord])[..]
}
fn max_term_ord(&self) -> TermOrdinal {
self.per_segment_new_term_ordinals
.iter()
.flat_map(|term_ordinals| term_ordinals.iter().max().cloned())
.max()
.unwrap_or_default()
}
}
struct DeltaComputer {
@@ -814,7 +806,7 @@ impl IndexMerger {
doc_id_mapping,
&reader_and_field_accessors,
)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1);
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field);
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;