support multiple codecs

support multiple codes
prepend codec id to all fast fields
add new api to create fastfields with access to all data
use new fastfield creation api in initial creation and merge
remove unused collect of data in doc_id_mapping
This commit is contained in:
Pascal Seitz
2021-06-02 11:12:45 +02:00
parent 5209238c1b
commit 6bf4fee1ba
10 changed files with 510 additions and 304 deletions

View File

@@ -34,7 +34,9 @@ pub use self::reader::DynamicFastFieldReader;
pub use self::reader::FastFieldReader;
pub use self::readers::FastFieldReaders;
pub use self::serializer::CompositeFastFieldSerializer;
pub use self::serializer::FastFieldDataAccess;
pub use self::serializer::FastFieldSerializer;
pub use self::serializer::FastFieldStats;
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::schema::Cardinality;
use crate::schema::FieldType;
@@ -239,7 +241,7 @@ mod tests {
#[test]
pub fn test_fastfield() {
let test_fastfield = BitpackedFastFieldReader::<u64>::from(vec![100, 200, 300]);
let test_fastfield = DynamicFastFieldReader::<u64>::from(vec![100, 200, 300]);
assert_eq!(test_fastfield.get(0), 100);
assert_eq!(test_fastfield.get(1), 200);
assert_eq!(test_fastfield.get(2), 300);
@@ -268,10 +270,10 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 36 as usize);
assert_eq!(file.len(), 37 as usize);
let composite_file = CompositeFile::open(&file)?;
let file = composite_file.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(file)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(file)?;
assert_eq!(fast_field_reader.get(0), 13u64);
assert_eq!(fast_field_reader.get(1), 14u64);
assert_eq!(fast_field_reader.get(2), 2u64);
@@ -299,11 +301,11 @@ mod tests {
serializer.close()?;
}
let file = directory.open_read(&path)?;
assert_eq!(file.len(), 61 as usize);
assert_eq!(file.len(), 62 as usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 4u64);
assert_eq!(fast_field_reader.get(1), 14_082_001u64);
assert_eq!(fast_field_reader.get(2), 3_052u64);
@@ -335,11 +337,11 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 34 as usize);
assert_eq!(file.len(), 35 as usize);
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
for doc in 0..10_000 {
assert_eq!(fast_field_reader.get(doc), 100_000u64);
}
@@ -367,11 +369,11 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 80042 as usize);
assert_eq!(file.len(), 80043 as usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
assert_eq!(fast_field_reader.get(0), 0u64);
for doc in 1..10_001 {
assert_eq!(
@@ -406,11 +408,11 @@ mod tests {
serializer.close().unwrap();
}
let file = directory.open_read(&path).unwrap();
assert_eq!(file.len(), 17709 as usize);
assert_eq!(file.len(), 17710 as usize);
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
assert_eq!(fast_field_reader.min_value(), -100i64);
assert_eq!(fast_field_reader.max_value(), 9_999i64);
@@ -450,7 +452,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(i64_field).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<i64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<i64>::open(data)?;
assert_eq!(fast_field_reader.get(0u32), 0i64);
}
Ok(())
@@ -483,7 +485,7 @@ mod tests {
{
let fast_fields_composite = CompositeFile::open(&file)?;
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = BitpackedFastFieldReader::<u64>::open(data)?;
let fast_field_reader = DynamicFastFieldReader::<u64>::open(data)?;
let mut a = 0u64;
for _ in 0..n {

View File

@@ -1,4 +1,4 @@
use crate::fastfield::serializer::DynamicFastFieldSerializer;
use crate::fastfield::serializer::BitpackedFastFieldSerializer;
use crate::fastfield::serializer::FastFieldSerializer;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::postings::UnorderedTermId;
@@ -155,7 +155,7 @@ impl MultiValuedFastFieldWriter {
}
{
// writing the values themselves.
let mut value_serializer: DynamicFastFieldSerializer<'_, _>;
let mut value_serializer: BitpackedFastFieldSerializer<'_, _>;
match mapping_opt {
Some(mapping) => {
value_serializer = serializer.new_u64_fast_field_with_idx(

View File

@@ -67,8 +67,12 @@ pub enum DynamicFastFieldReader<Item: FastValue> {
impl<Item: FastValue> DynamicFastFieldReader<Item> {
/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data.
pub fn open(file: FileSlice) -> crate::Result<DynamicFastFieldReader<Item>> {
let mut bytes = file.read_bytes()?;
let (mut id_bytes, data_bytes) = bytes.split(1);
let id = u8::deserialize(&mut id_bytes)?;
Ok(DynamicFastFieldReader::Bitpacked(
BitpackedFastFieldReader::open(file)?,
BitpackedFastFieldReader::open_from_bytes(data_bytes)?,
))
}
}
@@ -113,6 +117,11 @@ impl<Item: FastValue> BitpackedFastFieldReader<Item> {
/// Opens a fast field given a file.
pub fn open(file: FileSlice) -> crate::Result<Self> {
let mut bytes = file.read_bytes()?;
let _id = u8::deserialize(&mut bytes)?;
Self::open_from_bytes(bytes)
}
/// Opens a fast field given a file.
pub fn open_from_bytes(mut bytes: OwnedBytes) -> crate::Result<Self> {
let min_value = u64::deserialize(&mut bytes)?;
let amplitude = u64::deserialize(&mut bytes)?;
let max_value = min_value + amplitude;
@@ -198,8 +207,8 @@ impl<Item: FastValue> FastFieldReader<Item> for BitpackedFastFieldReader<Item> {
}
}
impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> BitpackedFastFieldReader<Item> {
impl<Item: FastValue> From<Vec<Item>> for DynamicFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> DynamicFastFieldReader<Item> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field("field", FAST);
let schema = schema_builder.build();
@@ -231,6 +240,6 @@ impl<Item: FastValue> From<Vec<Item>> for BitpackedFastFieldReader<Item> {
let field_file = composite_file
.open_read(field)
.expect("File component not found");
BitpackedFastFieldReader::open(field_file).unwrap()
DynamicFastFieldReader::open(field_file).unwrap()
}
}

View File

@@ -1,256 +0,0 @@
use crate::common::BinarySerializable;
use crate::common::CompositeWrite;
use crate::common::CountingWriter;
use crate::directory::WritePtr;
use crate::schema::Field;
use std::io::{self, Write};
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
/// `CompositeFastFieldSerializer` is in charge of serializing
/// fastfields on disk.
///
/// Fast fields have differnt encodings like bit-packing.
///
/// `FastFieldWriter`s are in charge of pushing the data to
/// the serializer.
/// The serializer expects to receive the following calls.
///
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer { composite_write })
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field_with_idx(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
idx: usize,
) -> io::Result<DynamicFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
DynamicFastFieldSerializer::open(field_write, min_value, max_value)
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
&mut self,
field: Field,
idx: usize,
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
FastBytesFieldSerializer { write: field_write }
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(self) -> io::Result<()> {
self.composite_write.close()
}
}
#[derive(Debug, Clone)]
pub struct EstimationStats {
min_value: u64,
max_value: u64,
}
/// The FastFieldSerializer trait is the common interface
/// implemented by every fastfield serializer variant.
///
/// `DynamicFastFieldSerializer` is the enum wrapping all variants.
/// It is used to create an serializer instance.
pub trait FastFieldSerializer {
/// add value to serializer
fn add_val(&mut self, val: u64) -> io::Result<()>;
/// finish serializing a field.
fn close_field(self) -> io::Result<()>;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldSerializerEstimate {
/// returns an estimate of the compression ratio.
fn estimate(
/*fastfield_accessor: impl FastFieldReader<u64>,*/ stats: EstimationStats,
) -> (f32, &'static str);
/// the unique name of the compressor
fn name() -> &'static str;
}
pub enum DynamicFastFieldSerializer<'a, W: Write> {
Bitpacked(BitpackedFastFieldSerializer<'a, W>),
}
impl<'a, W: Write> DynamicFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
let stats = EstimationStats {
min_value,
max_value,
};
let (_ratio, name) = (
BitpackedFastFieldSerializer::<Vec<u8>>::estimate(stats),
BitpackedFastFieldSerializer::<Vec<u8>>::name(),
);
Self::open_from_name(write, min_value, max_value, name)
}
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub fn open_from_name(
write: &'a mut W,
min_value: u64,
max_value: u64,
name: &str,
) -> io::Result<DynamicFastFieldSerializer<'a, W>> {
// Weirdly the W generic on BitpackedFastFieldSerializer needs to be set,
// although name() doesn't use it
let variant = if name == BitpackedFastFieldSerializer::<Vec<u8>>::name() {
DynamicFastFieldSerializer::Bitpacked(BitpackedFastFieldSerializer::open(
write, min_value, max_value,
)?)
} else {
panic!("unknown fastfield serializer {}", name);
};
Ok(variant)
}
}
impl<'a, W: Write> FastFieldSerializer for DynamicFastFieldSerializer<'a, W> {
fn add_val(&mut self, val: u64) -> io::Result<()> {
match self {
Self::Bitpacked(serializer) => serializer.add_val(val),
}
}
fn close_field(self) -> io::Result<()> {
match self {
Self::Bitpacked(serializer) => serializer.close_field(),
}
}
}
pub struct BitpackedFastFieldSerializer<'a, W: Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
num_bits: u8,
}
impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializer<'a, W>> {
assert!(min_value <= max_value);
min_value.serialize(write)?;
let amplitude = max_value - min_value;
amplitude.serialize(write)?;
let num_bits = compute_num_bits(amplitude);
let bit_packer = BitPacker::new();
Ok(BitpackedFastFieldSerializer {
bit_packer,
write,
min_value,
num_bits,
})
}
}
impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> {
/// Pushes a new value to the currently open u64 fast field.
fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
Ok(())
}
fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)
}
}
impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> {
fn estimate(
/*_fastfield_accessor: impl FastFieldReader<u64>, */ stats: EstimationStats,
) -> (f32, &'static str) {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
let ratio = num_bits as f32 / num_bits_uncompressed as f32;
let name = Self::name();
(ratio, name)
}
fn name() -> &'static str {
"Bitpacked"
}
}
pub struct FastBytesFieldSerializer<'a, W: Write> {
write: &'a mut W,
}
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
self.write.write_all(vals)
}
pub fn flush(&mut self) -> io::Result<()> {
self.write.flush()
}
}

View File

@@ -0,0 +1,97 @@
use super::FastFieldDataAccess;
use super::FastFieldSerializer;
use super::FastFieldSerializerEstimate;
use super::FastFieldStats;
use crate::common::BinarySerializable;
use std::io::{self, Write};
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
pub struct BitpackedFastFieldSerializer<'a, W: 'a + Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
num_bits: u8,
}
impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub(crate) fn open(
write: &'a mut W,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializer<'a, W>> {
assert!(min_value <= max_value);
min_value.serialize(write)?;
let amplitude = max_value - min_value;
amplitude.serialize(write)?;
let num_bits = compute_num_bits(amplitude);
let bit_packer = BitPacker::new();
Ok(BitpackedFastFieldSerializer {
bit_packer,
write,
min_value,
num_bits,
})
}
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub(crate) fn create(
write: &'a mut W,
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut serializer = Self::open(write, stats.min_value, stats.max_value)?;
for val in data_iter {
serializer.add_val(val)?;
}
serializer.close_field()?;
Ok(())
}
}
impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> {
/// Pushes a new value to the currently open u64 fast field.
fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
Ok(())
}
fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)
}
}
impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> {
fn estimate(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> (f32, &'static str) {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
let ratio = num_bits as f32 / num_bits_uncompressed as f32;
let name = Self::codec_id().0;
(ratio, name)
}
fn codec_id() -> (&'static str, u8) {
("Bitpacked", 1)
}
}

View File

@@ -0,0 +1,83 @@
use super::FastFieldDataAccess;
use super::FastFieldSerializer;
use super::FastFieldSerializerEstimate;
use super::FastFieldStats;
use crate::common::BinarySerializable;
use std::io::{self, Write};
use tantivy_bitpacker::compute_num_bits;
use tantivy_bitpacker::BitPacker;
/// Fastfield serializer, which tries to guess values by linear interpolation
/// and stores the difference.
pub struct LinearInterpolFastFieldSerializer<'a, W: 'a + Write> {
bit_packer: BitPacker,
write: &'a mut W,
min_value: u64,
num_bits: u8,
}
impl<'a, W: Write> LinearInterpolFastFieldSerializer<'a, W> {
/// Creates a new fast field serializer.
///
/// The serializer in fact encode the values by bitpacking
/// `(val - min_value)`.
///
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
pub(crate) fn create(
write: &'a mut W,
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
stats.min_value.serialize(write)?;
let amplitude = stats.max_value - stats.min_value;
amplitude.serialize(write)?;
let num_bits = compute_num_bits(amplitude);
let mut serializer = LinearInterpolFastFieldSerializer {
bit_packer: BitPacker::new(),
write,
min_value: stats.min_value,
num_bits,
};
for val in data_iter {
serializer.add_val(val)?;
}
serializer.close_field()?;
Ok(())
}
}
impl<'a, W: 'a + Write> FastFieldSerializer for LinearInterpolFastFieldSerializer<'a, W> {
/// Pushes a new value to the currently open u64 fast field.
fn add_val(&mut self, val: u64) -> io::Result<()> {
let val_to_write: u64 = val - self.min_value;
self.bit_packer
.write(val_to_write, self.num_bits, &mut self.write)?;
Ok(())
}
fn close_field(mut self) -> io::Result<()> {
self.bit_packer.close(&mut self.write)
}
}
impl<'a, W: 'a + Write> FastFieldSerializerEstimate for LinearInterpolFastFieldSerializer<'a, W> {
fn estimate(
_fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> (f32, &'static str) {
let amplitude = stats.max_value - stats.min_value;
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
let ratio = num_bits as f32 / num_bits_uncompressed as f32;
let name = Self::codec_id().0;
(ratio, name)
}
fn codec_id() -> (&'static str, u8) {
("Bitpacked", 2)
}
}

View File

@@ -0,0 +1,175 @@
mod bitpacked;
mod linearinterpol;
use crate::common::BinarySerializable;
use crate::common::CompositeWrite;
use crate::common::CountingWriter;
use crate::directory::WritePtr;
use crate::schema::Field;
use crate::DocId;
pub use bitpacked::BitpackedFastFieldSerializer;
use std::io::{self, Write};
/// FastFieldReader is the trait to access fast field data.
pub trait FastFieldDataAccess: Clone {
//type IteratorType: Iterator<Item = u64>;
/// Return the value associated to the given document.
///
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
fn get(&self, doc: DocId) -> u64;
}
/// `CompositeFastFieldSerializer` is in charge of serializing
/// fastfields on disk.
///
/// Fast fields have different encodings like bit-packing.
///
/// `FastFieldWriter`s are in charge of pushing the data to
/// the serializer.
/// The serializer expects to receive the following calls.
///
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `new_u64_fast_field(...)`
/// * `add_val(...)`
/// * ...
/// * `close_field()`
/// * `close()`
pub struct CompositeFastFieldSerializer {
composite_write: CompositeWrite<WritePtr>,
}
impl CompositeFastFieldSerializer {
/// Constructor
pub fn from_write(write: WritePtr) -> io::Result<CompositeFastFieldSerializer> {
// just making room for the pointer to header.
let composite_write = CompositeWrite::wrap(write);
Ok(CompositeFastFieldSerializer { composite_write })
}
/// Serialize data into a new u64 fast field. The compression will be detected automatically.
pub fn create_auto_detect_u64_fast_field(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
data_iter_1: impl Iterator<Item = u64>,
data_iter_2: impl Iterator<Item = u64>,
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, 0);
let (_ratio, (name, id)) = (
BitpackedFastFieldSerializer::<Vec<u8>>::estimate(&fastfield_accessor, stats.clone()),
BitpackedFastFieldSerializer::<Vec<u8>>::codec_id(),
);
id.serialize(field_write)?;
if name == BitpackedFastFieldSerializer::<Vec<u8>>::codec_id().0 {
BitpackedFastFieldSerializer::create(
field_write,
&fastfield_accessor,
stats,
data_iter_1,
)?;
} else {
panic!("unknown fastfield serializer {}", name);
};
Ok(())
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
) -> io::Result<BitpackedFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
self.new_u64_fast_field_with_idx(field, min_value, max_value, 0)
}
/// Start serializing a new u64 fast field
pub fn new_u64_fast_field_with_idx(
&mut self,
field: Field,
min_value: u64,
max_value: u64,
idx: usize,
) -> io::Result<BitpackedFastFieldSerializer<'_, CountingWriter<WritePtr>>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
// Prepend codec id to field data for compatibility with DynamicFastFieldReader.
let (_name, id) = BitpackedFastFieldSerializer::<Vec<u8>>::codec_id();
id.serialize(field_write)?;
BitpackedFastFieldSerializer::open(field_write, min_value, max_value)
}
/// Start serializing a new [u8] fast field
pub fn new_bytes_fast_field_with_idx(
&mut self,
field: Field,
idx: usize,
) -> FastBytesFieldSerializer<'_, CountingWriter<WritePtr>> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
FastBytesFieldSerializer { write: field_write }
}
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
pub fn close(self) -> io::Result<()> {
self.composite_write.close()
}
}
#[derive(Debug, Clone)]
pub struct FastFieldStats {
pub min_value: u64,
pub max_value: u64,
pub num_vals: u64,
}
/// The FastFieldSerializer trait is the common interface
/// implemented by every fastfield serializer variant.
///
/// `DynamicFastFieldSerializer` is the enum wrapping all variants.
/// It is used to create an serializer instance.
pub trait FastFieldSerializer {
/// add value to serializer
fn add_val(&mut self, val: u64) -> io::Result<()>;
/// finish serializing a field.
fn close_field(self) -> io::Result<()>;
}
/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldSerializerEstimate {
/// returns an estimate of the compression ratio.
fn estimate(
fastfield_accessor: &impl FastFieldDataAccess,
stats: FastFieldStats,
) -> (f32, &'static str);
/// the unique (name, id) of the compressor. Used to distinguish when de/serializing.
fn codec_id() -> (&'static str, u8);
}
pub struct FastBytesFieldSerializer<'a, W: Write> {
write: &'a mut W,
}
impl<'a, W: Write> FastBytesFieldSerializer<'a, W> {
pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> {
self.write.write_all(vals)
}
pub fn flush(&mut self) -> io::Result<()> {
self.write.flush()
}
}

View File

@@ -1,4 +1,7 @@
use super::multivalued::MultiValuedFastFieldWriter;
use super::serializer::FastFieldStats;
use super::FastFieldDataAccess;
use super::FastFieldReader;
use crate::common;
use crate::fastfield::serializer::FastFieldSerializer;
use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer};
@@ -6,6 +9,7 @@ use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::UnorderedTermId;
use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema};
use crate::termdict::TermOrdinal;
use crate::DocId;
use fnv::FnvHashMap;
use std::collections::HashMap;
use std::io;
@@ -265,9 +269,9 @@ impl IntFastFieldWriter {
self.add_val(val);
}
/// Extract the stored data
pub(crate) fn get_data(&self) -> Vec<u64> {
self.vals.iter().collect::<Vec<u64>>()
/// get iterator over the data
pub(crate) fn iter(&self) -> impl Iterator<Item = u64> + '_ {
self.vals.iter()
}
/// Push the fast fields value to the `FastFieldWriter`.
@@ -281,17 +285,58 @@ impl IntFastFieldWriter {
} else {
(self.val_min, self.val_max)
};
let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?;
if let Some(doc_id_map) = doc_id_map {
for doc_id in doc_id_map.iter_old_doc_ids() {
single_field_serializer.add_val(self.vals.get(*doc_id as usize))?;
}
} else {
for val in self.vals.iter() {
single_field_serializer.add_val(val)?;
}
let fastfield_accessor = WriterFastFieldAccessProvider {
doc_id_map,
vals: &self.vals,
};
let stats = FastFieldStats {
min_value: min,
max_value: max,
num_vals: self.val_count as u64,
};
single_field_serializer.close_field()
if let Some(doc_id_map) = doc_id_map {
let iter = doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(*doc_id as usize));
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter.clone(),
iter,
)?;
} else {
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
self.vals.iter(),
self.vals.iter(),
)?;
};
Ok(())
}
}
#[derive(Clone)]
struct WriterFastFieldAccessProvider<'map, 'bitp> {
doc_id_map: Option<&'map DocIdMapping>,
vals: &'bitp BlockedBitpacker,
}
impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> {
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.
///
/// # Panics
///
/// May panic if `doc` is greater than the segment
fn get(&self, doc: DocId) -> u64 {
if let Some(doc_id_map) = self.doc_id_map {
self.vals.get(doc_id_map.get_old_doc_id(doc) as usize) // consider extra FastFieldReader wrapper for non doc_id_map
} else {
self.vals.get(doc as usize)
}
}
}

View File

@@ -60,9 +60,8 @@ pub(crate) fn get_doc_id_mapping_from_field(
})?;
// create new doc_id to old doc_id index (used in fast_field_writers)
let data = fast_field.get_data();
let mut doc_id_and_data = data
.into_iter()
let mut doc_id_and_data = fast_field
.iter()
.enumerate()
.map(|el| (el.0 as DocId, el.1))
.collect::<Vec<_>>();

View File

@@ -3,8 +3,10 @@ use crate::error::DataCorruption;
use crate::fastfield::CompositeFastFieldSerializer;
use crate::fastfield::DeleteBitSet;
use crate::fastfield::DynamicFastFieldReader;
use crate::fastfield::FastFieldDataAccess;
use crate::fastfield::FastFieldReader;
use crate::fastfield::FastFieldSerializer;
use crate::fastfield::FastFieldStats;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::fieldnorm::FieldNormsSerializer;
use crate::fieldnorm::FieldNormsWriter;
@@ -344,21 +346,71 @@ impl IndexMerger {
})
.collect::<Vec<_>>();
if let Some(doc_id_mapping) = doc_id_mapping {
let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
(
doc_id,
&fast_field_readers[reader_with_ordinal.ordinal as usize],
)
});
// add values in order of the new doc_ids
let mut fast_single_field_serializer =
fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?;
for (doc_id, field_reader) in sorted_doc_ids {
let val = field_reader.get(*doc_id);
fast_single_field_serializer.add_val(val)?;
//struct SortedDocidAccessor {};
#[derive(Clone)]
struct SortedDocidFieldAccessProvider<'a> {
doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
}
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
//type IteratorType = ;
//type IteratorType: std::iter::Map<std::slice::Iter<'a, (u32, SegmentReaderWithOrdinal<'_>)>, _>
//type IteratorType =
//std::iter::Map<std::slice::Iter<'a, (u32, SegmentReaderWithOrdinal<'a>)>, u64>;
fn get(&self, doc: DocId) -> u64 {
let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize];
self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id)
}
fast_single_field_serializer.close_field()?;
//fn iter(&self) -> Self::IteratorType {
//self.doc_id_mapping
//.iter()
//.map(|(doc_id, reader_with_ordinal)| {
//let fast_field_reader =
//&self.fast_field_readers[reader_with_ordinal.ordinal as usize];
//let val = self.field_reader.get(*doc_id);
//val
//})
//}
}
let stats = FastFieldStats {
min_value,
max_value,
num_vals: doc_id_mapping.len() as u64,
};
let fastfield_accessor = SortedDocidFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
let val = fast_field_reader.get(*doc_id);
val
});
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter.clone(),
iter,
)?;
//let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
//(
//doc_id,
//&fast_field_readers[reader_with_ordinal.ordinal as usize],
//)
//});
//// add values in order of the new doc_ids
//let mut fast_single_field_serializer =
//fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?;
//for (doc_id, field_reader) in sorted_doc_ids {
//let val = field_reader.get(*doc_id);
//fast_single_field_serializer.add_val(val)?;
//}
//fast_single_field_serializer.close_field()?;
Ok(())
} else {
let u64_readers = self.readers.iter()