mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-05 01:50:42 +00:00
use iter api
This commit is contained in:
@@ -100,7 +100,8 @@ mod tests {
|
||||
|
||||
fn get_u128_column_from_data(data: &[u128]) -> Arc<dyn Column<u128>> {
|
||||
let mut out = vec![];
|
||||
serialize_u128(VecColumn::from(&data), &mut out).unwrap();
|
||||
let iter = || data.iter().cloned();
|
||||
serialize_u128(iter, data.len() as u64, &mut out).unwrap();
|
||||
let out = OwnedBytes::new(out);
|
||||
open_u128::<u128>(out).unwrap()
|
||||
}
|
||||
|
||||
@@ -171,10 +171,10 @@ pub struct IPCodecParams {
|
||||
|
||||
impl CompactSpaceCompressor {
|
||||
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
|
||||
pub fn train_from(column: &impl Column<u128>) -> Self {
|
||||
pub fn train_from(iter: impl Iterator<Item = u128>, num_vals: u64) -> Self {
|
||||
let mut values_sorted = BTreeSet::new();
|
||||
values_sorted.extend(column.iter());
|
||||
let total_num_values = column.num_vals();
|
||||
values_sorted.extend(iter);
|
||||
let total_num_values = num_vals;
|
||||
|
||||
let compact_space =
|
||||
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
|
||||
@@ -443,7 +443,7 @@ impl CompactSpaceDecompressor {
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::{open_u128, serialize_u128, VecColumn};
|
||||
use crate::{open_u128, serialize_u128};
|
||||
|
||||
#[test]
|
||||
fn compact_space_test() {
|
||||
@@ -513,7 +513,12 @@ mod tests {
|
||||
|
||||
fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes {
|
||||
let mut out = Vec::new();
|
||||
serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap();
|
||||
serialize_u128(
|
||||
|| u128_vals.iter().cloned(),
|
||||
u128_vals.len() as u64,
|
||||
&mut out,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let data = OwnedBytes::new(out);
|
||||
test_all(data.clone(), u128_vals);
|
||||
@@ -603,7 +608,7 @@ mod tests {
|
||||
5_000_000_000,
|
||||
];
|
||||
let mut out = Vec::new();
|
||||
serialize_u128(VecColumn::from(vals), &mut out).unwrap();
|
||||
serialize_u128(|| vals.iter().cloned(), vals.len() as u64, &mut out).unwrap();
|
||||
let decomp = open_u128::<u128>(OwnedBytes::new(out)).unwrap();
|
||||
|
||||
assert_eq!(decomp.get_between_vals(199..=200), vec![0]);
|
||||
|
||||
@@ -90,7 +90,7 @@ fn bench_ip() {
|
||||
{
|
||||
let mut data = vec![];
|
||||
for dataset in dataset.chunks(500_000) {
|
||||
serialize_u128(VecColumn::from(dataset), &mut data).unwrap();
|
||||
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
|
||||
}
|
||||
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
|
||||
println!("Compression 50_000 chunks {:.4}", compression);
|
||||
@@ -101,7 +101,10 @@ fn bench_ip() {
|
||||
}
|
||||
|
||||
let mut data = vec![];
|
||||
serialize_u128(VecColumn::from(&dataset), &mut data).unwrap();
|
||||
{
|
||||
print_time!("creation");
|
||||
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
|
||||
}
|
||||
|
||||
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
|
||||
println!("Compression {:.2}", compression);
|
||||
|
||||
@@ -142,15 +142,14 @@ pub fn estimate<T: MonotonicallyMappableToU64>(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_u128(
|
||||
typed_column: impl Column<u128>,
|
||||
pub fn serialize_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
|
||||
iter_gen: F,
|
||||
num_vals: u64,
|
||||
output: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
// TODO write header, to later support more codecs
|
||||
let compressor = CompactSpaceCompressor::train_from(&typed_column);
|
||||
compressor
|
||||
.compress_into(typed_column.iter(), output)
|
||||
.unwrap();
|
||||
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
|
||||
compressor.compress_into(iter_gen(), output).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -361,15 +361,8 @@ impl MultiValueU128FastFieldWriter {
|
||||
{
|
||||
let field_write = serializer.get_field_writer(self.field, 1);
|
||||
|
||||
let mut values = Vec::with_capacity(self.vals.len());
|
||||
for vals in self.get_ordered_values(doc_id_map) {
|
||||
for &val in vals {
|
||||
values.push(val);
|
||||
}
|
||||
}
|
||||
let col = VecColumn::from(&values[..]);
|
||||
|
||||
serialize_u128(col, field_write)?;
|
||||
let iter = || self.get_ordered_values(doc_id_map).flatten().cloned();
|
||||
serialize_u128(iter, self.vals.len() as u64, field_write)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -364,65 +364,31 @@ impl U128FastFieldWriter {
|
||||
}
|
||||
}
|
||||
|
||||
struct RemappedFFWriter<'a> {
|
||||
doc_id_map: Option<&'a DocIdMapping>,
|
||||
null_values: &'a RoaringBitmap,
|
||||
vals: &'a [u128],
|
||||
idx_to_val_idx: Vec<u32>,
|
||||
val_count: u32,
|
||||
}
|
||||
impl<'a> Column<u128> for RemappedFFWriter<'a> {
|
||||
fn get_val(&self, _idx: u64) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.val_count as u64
|
||||
}
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
|
||||
if let Some(doc_id_map) = self.doc_id_map {
|
||||
let iter = doc_id_map.iter_old_doc_ids().map(|idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
0 // TODO properly handle nulls
|
||||
} else {
|
||||
self.vals[self.idx_to_val_idx[idx as usize] as usize]
|
||||
}
|
||||
});
|
||||
Box::new(iter)
|
||||
} else {
|
||||
let iter = (0..self.val_count).map(|idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
0 // TODO properly handle nulls
|
||||
} else {
|
||||
self.vals[self.idx_to_val_idx[idx as usize] as usize]
|
||||
}
|
||||
});
|
||||
Box::new(iter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let column = RemappedFFWriter {
|
||||
doc_id_map,
|
||||
null_values: &self.null_values,
|
||||
vals: &self.vals,
|
||||
idx_to_val_idx,
|
||||
val_count: self.val_count,
|
||||
};
|
||||
|
||||
let field_write = serializer.get_field_writer(self.field, 0);
|
||||
serialize_u128(column, field_write)?;
|
||||
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
let iter = || {
|
||||
doc_id_map.iter_old_doc_ids().map(|idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
0 // TODO properly handle nulls
|
||||
} else {
|
||||
self.vals[idx_to_val_idx[idx as usize] as usize]
|
||||
}
|
||||
})
|
||||
};
|
||||
serialize_u128(iter, self.val_count as u64, field_write)?;
|
||||
} else {
|
||||
let iter = || {
|
||||
(0..self.val_count).map(|idx| {
|
||||
if self.null_values.contains(idx as u32) {
|
||||
0 // TODO properly handle nulls
|
||||
} else {
|
||||
self.vals[idx_to_val_idx[idx as usize] as usize]
|
||||
}
|
||||
})
|
||||
};
|
||||
serialize_u128(iter, self.val_count as u64, field_write)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -355,49 +355,16 @@ impl IndexMerger {
|
||||
.map(|(_, ff_reader)| ff_reader)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
struct RemappedFFReader<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: Vec<MultiValuedU128FastFieldReader<u128>>,
|
||||
}
|
||||
impl<'a> Column<u128> for RemappedFFReader<'a> {
|
||||
fn get_val(&self, _idx: u64) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.doc_id_mapping.len() as u64
|
||||
}
|
||||
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
|
||||
Box::new(
|
||||
self.doc_id_mapping
|
||||
.iter_old_doc_addrs()
|
||||
.flat_map(|doc_addr| {
|
||||
let fast_field_reader =
|
||||
&self.fast_field_readers[doc_addr.segment_ord as usize];
|
||||
let mut out = vec![];
|
||||
fast_field_reader.get_vals(doc_addr.doc_id, &mut out);
|
||||
out.into_iter()
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
let column = RemappedFFReader {
|
||||
doc_id_mapping,
|
||||
fast_field_readers,
|
||||
let iter = || {
|
||||
doc_id_mapping.iter_old_doc_addrs().flat_map(|doc_addr| {
|
||||
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
|
||||
let mut out = vec![];
|
||||
fast_field_reader.get_vals(doc_addr.doc_id, &mut out);
|
||||
out.into_iter()
|
||||
})
|
||||
};
|
||||
let field_write = fast_field_serializer.get_field_writer(field, 1);
|
||||
serialize_u128(column, field_write)?;
|
||||
serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -421,42 +388,14 @@ impl IndexMerger {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
struct RemappedFFReader<'a> {
|
||||
doc_id_mapping: &'a SegmentDocIdMapping,
|
||||
fast_field_readers: Vec<Arc<dyn Column<u128>>>,
|
||||
}
|
||||
impl<'a> Column<u128> for RemappedFFReader<'a> {
|
||||
fn get_val(&self, _idx: u64) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn min_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn max_value(&self) -> u128 {
|
||||
// unused by codec
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn num_vals(&self) -> u64 {
|
||||
self.doc_id_mapping.len() as u64
|
||||
}
|
||||
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
|
||||
Box::new(self.doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
|
||||
let fast_field_reader = &self.fast_field_readers[doc_addr.segment_ord as usize];
|
||||
fast_field_reader.get_val(doc_addr.doc_id as u64)
|
||||
}))
|
||||
}
|
||||
}
|
||||
let column = RemappedFFReader {
|
||||
doc_id_mapping,
|
||||
fast_field_readers,
|
||||
};
|
||||
let field_write = fast_field_serializer.get_field_writer(field, 0);
|
||||
serialize_u128(column, field_write)?;
|
||||
let iter = || {
|
||||
doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
|
||||
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
|
||||
fast_field_reader.get_val(doc_addr.doc_id as u64)
|
||||
})
|
||||
};
|
||||
fastfield_codecs::serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::mem;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
|
||||
@@ -97,6 +98,11 @@ impl Document {
|
||||
self.add_field_value(field, value);
|
||||
}
|
||||
|
||||
/// Add a u64 field
|
||||
pub fn add_ip(&mut self, field: Field, value: IpAddr) {
|
||||
self.add_field_value(field, value);
|
||||
}
|
||||
|
||||
/// Add a i64 field
|
||||
pub fn add_i64(&mut self, field: Field, value: i64) {
|
||||
self.add_field_value(field, value);
|
||||
|
||||
Reference in New Issue
Block a user