move iter to FastFieldDataAccess

This commit is contained in:
Pascal Seitz
2022-08-22 17:00:12 +02:00
parent 67d94f5bd2
commit c39c2d79da
9 changed files with 77 additions and 135 deletions

View File

@@ -34,14 +34,7 @@ mod tests {
data: &[u64],
) {
let mut bytes = vec![];
S::serialize(
&mut bytes,
&data,
stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
S::serialize(&mut bytes, &data, stats_from_vec(data)).unwrap();
let reader = R::open_from_bytes(&bytes).unwrap();
b.iter(|| {
for pos in value_iter() {
@@ -52,14 +45,7 @@ mod tests {
fn bench_create<S: FastFieldCodecSerializer>(b: &mut Bencher, data: &[u64]) {
let mut bytes = vec![];
b.iter(|| {
S::serialize(
&mut bytes,
&data,
stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
S::serialize(&mut bytes, &data, stats_from_vec(data)).unwrap();
});
}

View File

@@ -113,13 +113,11 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer {
write: &mut impl Write,
_fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut serializer =
BitpackedFastFieldSerializerLegacy::open(write, stats.min_value, stats.max_value)?;
for val in data_iter {
for val in _fastfield_accessor.iter() {
serializer.add_val(val)?;
}
serializer.close_field()?;

View File

@@ -38,14 +38,13 @@ pub trait FastFieldCodecSerializer {
fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32;
/// Serializes the data using the serializer into write.
/// There are multiple iterators, in case the codec needs to read the data multiple times.
/// The iterators should be preferred over using fastfield_accessor for performance reasons.
///
/// The fastfield_accessor iterator should be preferred over using fastfield_accessor for
/// performance reasons.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()>;
}
@@ -60,6 +59,9 @@ pub trait FastFieldDataAccess {
///
/// May panic if `position` is greater than the index.
fn get_val(&self, position: u64) -> u64;
/// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = u64> + 'a>;
}
#[derive(Debug, Clone)]
@@ -74,12 +76,19 @@ impl<'a> FastFieldDataAccess for &'a [u64] {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new((self as &[u64]).iter().cloned())
}
}
impl FastFieldDataAccess for Vec<u64> {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new((&self as &[u64]).iter().cloned())
}
}
#[cfg(test)]
@@ -99,14 +108,7 @@ mod tests {
}
let estimation = S::estimate(&data, crate::tests::stats_from_vec(data));
let mut out: Vec<u8> = Vec::new();
S::serialize(
&mut out,
&data,
crate::tests::stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
S::serialize(&mut out, &data, crate::tests::stats_from_vec(data)).unwrap();
let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);

View File

@@ -116,8 +116,6 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
@@ -127,7 +125,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
// calculate offset to ensure all values are positive
let mut offset = 0;
let mut rel_positive_max = 0;
for (pos, actual_value) in data_iter1.enumerate() {
for (pos, actual_value) in fastfield_accessor.iter().enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
if calculated_value > actual_value {
// negative value we need to apply an offset
@@ -145,7 +143,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
let num_bits = compute_num_bits(relative_max_value);
let mut bit_packer = BitPacker::new();
for (pos, val) in data_iter.enumerate() {
for (pos, val) in fastfield_accessor.iter().enumerate() {
let calculated_value = get_calculated_value(first_val, pos as u64, slope);
let diff = (val + offset) - calculated_value;
bit_packer.write(diff, num_bits, write)?;

View File

@@ -100,14 +100,7 @@ pub fn serialize_with_codec<S: FastFieldCodecSerializer>(
}
let estimation = S::estimate(&data, stats_from_vec(data));
let mut out = vec![];
S::serialize(
&mut out,
&data,
stats_from_vec(data),
data.iter().cloned(),
data.iter().cloned(),
)
.unwrap();
S::serialize(&mut out, &data, stats_from_vec(data)).unwrap();
let actual_compression = out.len() as f32 / (data.len() * 8) as f32;
(true, estimation, actual_compression, S::NAME)

View File

@@ -198,8 +198,6 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
stats: FastFieldStats,
data_iter: impl Iterator<Item = u64>,
_data_iter1: impl Iterator<Item = u64>,
) -> io::Result<()> {
assert!(stats.min_value <= stats.max_value);
@@ -218,7 +216,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
// Since we potentially apply multiple passes over the data, the data is cached.
// Multiple iteration can be expensive (merge with index sorting can add lot of overhead per
// iteration)
let data = data_iter.collect::<Vec<_>>();
let data = fastfield_accessor.iter().collect::<Vec<_>>();
//// let's split this into chunks of CHUNK_SIZE
for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) {

View File

@@ -98,24 +98,13 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field<F, I>(
pub fn create_auto_detect_u64_fast_field(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter_gen: F,
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
self.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
iter_gen,
0,
)
) -> io::Result<()> {
self.create_auto_detect_u64_fast_field_with_idx(field, stats, fastfield_accessor, 0)
}
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
@@ -128,20 +117,15 @@ impl CompositeFastFieldSerializer {
/// Serialize data into a new u64 fast field. The best compression codec will be chosen
/// automatically.
pub fn create_auto_detect_u64_fast_field_with_idx<F, I>(
pub fn create_auto_detect_u64_fast_field_with_idx(
&mut self,
field: Field,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter_gen: F,
idx: usize,
) -> io::Result<()>
where
F: Fn() -> I,
I: Iterator<Item = u64>,
{
) -> io::Result<()> {
let field_write = self.composite_write.for_field_with_idx(field, idx);
let gcd = find_gcd(iter_gen().map(|val| val - stats.min_value))
let gcd = find_gcd(fastfield_accessor.iter().map(|val| val - stats.min_value))
.map(NonZeroU64::get)
.unwrap_or(GCD_DEFAULT);
@@ -152,8 +136,6 @@ impl CompositeFastFieldSerializer {
field_write,
stats,
fastfield_accessor,
iter_gen(),
iter_gen(),
);
}
@@ -167,6 +149,13 @@ impl CompositeFastFieldSerializer {
fn get_val(&self, position: u64) -> u64 {
(self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(
self.fastfield_accessor
.iter()
.map(|val| (val - self.min_value) / self.gcd),
)
}
}
let fastfield_accessor = GCDWrappedFFAccess {
@@ -181,16 +170,12 @@ impl CompositeFastFieldSerializer {
max_value: (stats.max_value - stats.min_value) / gcd,
num_vals: stats.num_vals,
};
let iter1 = iter_gen().map(|val| (val - min_value) / gcd);
let iter2 = iter_gen().map(|val| (val - min_value) / gcd);
Self::create_auto_detect_u64_fast_field_with_idx_gcd(
self.codec_enable_checker.clone(),
field,
field_write,
stats,
fastfield_accessor,
iter1,
iter2,
)?;
write_gcd_header(field_write, min_value, gcd)?;
Ok(())
@@ -204,8 +189,6 @@ impl CompositeFastFieldSerializer {
field_write: &mut CountingWriter<W>,
stats: FastFieldStats,
fastfield_accessor: impl FastFieldDataAccess,
iter1: impl Iterator<Item = u64>,
iter2: impl Iterator<Item = u64>,
) -> io::Result<()> {
let mut estimations = vec![];
@@ -250,21 +233,13 @@ impl CompositeFastFieldSerializer {
Self::write_header(field_write, id)?;
match name {
BitpackedFastFieldSerializer::NAME => {
BitpackedFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
BitpackedFastFieldSerializer::serialize(field_write, &fastfield_accessor, stats)?;
}
LinearInterpolFastFieldSerializer::NAME => {
LinearInterpolFastFieldSerializer::serialize(
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
}
MultiLinearInterpolFastFieldSerializer::NAME => {
@@ -272,8 +247,6 @@ impl CompositeFastFieldSerializer {
field_write,
&fastfield_accessor,
stats,
iter1,
iter2,
)?;
}
_ => {

View File

@@ -369,28 +369,8 @@ impl IntFastFieldWriter {
num_vals: self.val_count as u64,
};
if let Some(doc_id_map) = doc_id_map {
let iter_gen = || {
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize))
};
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter_gen,
)?;
} else {
let iter_gen = || self.vals.iter();
serializer.create_auto_detect_u64_fast_field(self.field, stats, fastfield_accessor)?;
serializer.create_auto_detect_u64_fast_field(
self.field,
stats,
fastfield_accessor,
iter_gen,
)?;
};
Ok(())
}
}
@@ -419,4 +399,16 @@ impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'b
self.vals.get(doc as usize)
}
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = u64> + 'a> {
if let Some(doc_id_map) = self.doc_id_map {
Box::new(
doc_id_map
.iter_old_doc_ids()
.map(|doc_id| self.vals.get(doc_id as usize)),
)
} else {
Box::new(self.vals.iter())
}
}
}

View File

@@ -134,7 +134,7 @@ impl TermOrdinalMapping {
fn max_term_ord(&self) -> TermOrdinal {
self.per_segment_new_term_ordinals
.iter()
.flat_map(|term_ordinals| term_ordinals.iter().cloned().max())
.flat_map(|term_ordinals| term_ordinals.iter().max())
.max()
.unwrap_or_default()
}
@@ -383,22 +383,27 @@ impl IndexMerger {
} = self.doc_id_mapping.get_old_doc_addr(doc as u32);
self.fast_field_readers[segment_ord as usize].get(doc_id)
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.map(|old_doc_addr| {
let fast_field_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get(old_doc_addr.doc_id)
}),
)
}
}
let fastfield_accessor = SortedDocIdFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter_gen = || {
doc_id_mapping.iter_old_doc_addrs().map(|old_doc_addr| {
let fast_field_reader = &fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get(old_doc_addr.doc_id)
})
};
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter_gen,
)?;
Ok(())
@@ -559,13 +564,7 @@ impl IndexMerger {
}
offsets.push(offset);
let iter_gen = || offsets.iter().cloned();
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
&offsets[..],
iter_gen,
)?;
fast_field_serializer.create_auto_detect_u64_fast_field(field, stats, &offsets[..])?;
Ok(offsets)
}
/// Returns the fastfield index (index for the data, not the data).
@@ -746,7 +745,7 @@ impl IndexMerger {
let new_doc_id: DocId =
self.offsets
.iter()
.position(|&offset| offset > pos)
.position(|offset| offset > pos)
.expect("pos is out of bounds") as DocId
- 1u32;
@@ -764,27 +763,30 @@ impl IndexMerger {
vals[pos_in_values as usize]
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
}),
)
}
}
let fastfield_accessor = SortedDocIdMultiValueAccessProvider {
doc_id_mapping,
fast_field_readers: &ff_readers,
offsets,
};
let iter_gen = || {
doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader = &ff_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
})
};
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
stats,
fastfield_accessor,
iter_gen,
1,
)?;