From 6d8581baaed40edd7b2dce4611ce8878ceff45e6 Mon Sep 17 00:00:00 2001 From: Yohei Tamura Date: Wed, 28 Apr 2021 15:10:59 +0900 Subject: [PATCH 01/10] Update README.md typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ab0c48975..9d792f386 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Your mileage WILL vary depending on the nature of queries and their load. # Features - Full-text search -- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)), Japanese ([lindera](https://github.com/lindera-morphology/lindera-tantivy) and [tantivy-tokenizer-tiny-segmente](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter)) and Korean ([lindera](https://github.com/lindera-morphology/lindera-tantivy) + [lindera-ko-dic-builder](https://github.com/lindera-morphology/lindera-ko-dic-builder)) +- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)), Japanese ([lindera](https://github.com/lindera-morphology/lindera-tantivy) and [tantivy-tokenizer-tiny-segmenter](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter)) and Korean ([lindera](https://github.com/lindera-morphology/lindera-tantivy) + [lindera-ko-dic-builder](https://github.com/lindera-morphology/lindera-ko-dic-builder)) - Fast (check out the :racehorse: :sparkles: [benchmark](https://tantivy-search.github.io/bench/) :sparkles: :racehorse:) - Tiny startup time (<10ms), perfect for command line tools - BM25 scoring (the same as Lucene) From daa53522b549cced229f40b412b87fb6c63a1f44 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 29 Apr 2021 16:40:02 +0200 Subject: [PATCH 02/10] move tantivy bitpacker to crate, refator bitpacker remove byteorder dependency --- Cargo.toml | 3 +- bitpacker/Cargo.toml | 8 +++ {src/common => bitpacker/src}/bitpacker.rs | 39 ++++++-------- bitpacker/src/blocked_bitpacker.rs | 55 ++++++++++++++++++++ bitpacker/src/lib.rs | 12 +++++ src/common/mod.rs | 1 - src/fastfield/reader.rs | 9 ++-- src/fastfield/serializer.rs | 2 +- src/termdict/fst_termdict/term_info_store.rs | 5 +- 9 files changed, 104 insertions(+), 30 deletions(-) create mode 100644 bitpacker/Cargo.toml rename {src/common => bitpacker/src}/bitpacker.rs (76%) create mode 100644 bitpacker/src/blocked_bitpacker.rs create mode 100644 bitpacker/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 25d81a8c4..12d3a08c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ uuid = { version = "0.8", features = ["v4", "serde"] } crossbeam = "0.8" futures = {version = "0.3", features=["thread-pool"] } tantivy-query-grammar = { version="0.14.0", path="./query-grammar" } +tantivy-bitpacker = { version="0.1", path="./bitpacker" } stable_deref_trait = "1" rust-stemmers = "1" downcast-rs = "1" @@ -86,7 +87,7 @@ unstable = [] # useful for benches. wasm-bindgen = ["uuid/wasm-bindgen"] [workspace] -members = ["query-grammar"] +members = ["query-grammar", "bitpacker"] [badges] travis-ci = { repository = "tantivy-search/tantivy" } diff --git a/bitpacker/Cargo.toml b/bitpacker/Cargo.toml new file mode 100644 index 000000000..05a4c81d8 --- /dev/null +++ b/bitpacker/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "tantivy-bitpacker" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/common/bitpacker.rs b/bitpacker/src/bitpacker.rs similarity index 76% rename from src/common/bitpacker.rs rename to bitpacker/src/bitpacker.rs index 640d8adcf..7f936760b 100644 --- a/src/common/bitpacker.rs +++ b/bitpacker/src/bitpacker.rs @@ -1,9 +1,6 @@ -use byteorder::{ByteOrder, LittleEndian, WriteBytesExt}; -use std::io; +use std::{convert::TryInto, io}; -use crate::directory::OwnedBytes; - -pub(crate) struct BitPacker { +pub struct BitPacker { mini_buffer: u64, mini_buffer_written: usize, } @@ -26,14 +23,14 @@ impl BitPacker { let num_bits = num_bits as usize; if self.mini_buffer_written + num_bits > 64 { self.mini_buffer |= val_u64.wrapping_shl(self.mini_buffer_written as u32); - output.write_u64::(self.mini_buffer)?; + output.write_all(self.mini_buffer.to_le_bytes().as_ref())?; self.mini_buffer = val_u64.wrapping_shr((64 - self.mini_buffer_written) as u32); self.mini_buffer_written = self.mini_buffer_written + num_bits - 64; } else { self.mini_buffer |= val_u64 << self.mini_buffer_written; self.mini_buffer_written += num_bits; if self.mini_buffer_written == 64 { - output.write_u64::(self.mini_buffer)?; + output.write_all(self.mini_buffer.to_le_bytes().as_ref())?; self.mini_buffer_written = 0; self.mini_buffer = 0u64; } @@ -44,9 +41,8 @@ impl BitPacker { pub fn flush(&mut self, output: &mut TWrite) -> io::Result<()> { if self.mini_buffer_written > 0 { let num_bytes = (self.mini_buffer_written + 7) / 8; - let mut arr: [u8; 8] = [0u8; 8]; - LittleEndian::write_u64(&mut arr, self.mini_buffer); - output.write_all(&arr[..num_bytes])?; + let bytes = self.mini_buffer.to_le_bytes(); + output.write_all(&bytes[..num_bytes])?; self.mini_buffer_written = 0; } Ok(()) @@ -64,11 +60,10 @@ impl BitPacker { pub struct BitUnpacker { num_bits: u64, mask: u64, - data: OwnedBytes, } impl BitUnpacker { - pub fn new(data: OwnedBytes, num_bits: u8) -> BitUnpacker { + pub fn new(num_bits: u8) -> BitUnpacker { let mask: u64 = if num_bits == 64 { !0u64 } else { @@ -77,15 +72,13 @@ impl BitUnpacker { BitUnpacker { num_bits: u64::from(num_bits), mask, - data, } } - pub fn get(&self, idx: u64) -> u64 { + pub fn get(&self, idx: u64, data: &[u8]) -> u64 { if self.num_bits == 0 { return 0u64; } - let data: &[u8] = self.data.as_slice(); let num_bits = self.num_bits; let mask = self.mask; let addr_in_bits = idx * num_bits; @@ -95,7 +88,10 @@ impl BitUnpacker { addr + 8 <= data.len() as u64, "The fast field field should have been padded with 7 bytes." ); - let val_unshifted_unmasked: u64 = LittleEndian::read_u64(&data[(addr as usize)..]); + let bytes: [u8; 8] = (&data[(addr as usize)..(addr as usize) + 8]) + .try_into() + .unwrap(); + let val_unshifted_unmasked: u64 = u64::from_le_bytes(bytes); let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; val_shifted & mask } @@ -104,9 +100,8 @@ impl BitUnpacker { #[cfg(test)] mod test { use super::{BitPacker, BitUnpacker}; - use crate::directory::OwnedBytes; - fn create_fastfield_bitpacker(len: usize, num_bits: u8) -> (BitUnpacker, Vec) { + fn create_fastfield_bitpacker(len: usize, num_bits: u8) -> (BitUnpacker, Vec, Vec) { let mut data = Vec::new(); let mut bitpacker = BitPacker::new(); let max_val: u64 = (1u64 << num_bits as u64) - 1u64; @@ -118,14 +113,14 @@ mod test { } bitpacker.close(&mut data).unwrap(); assert_eq!(data.len(), ((num_bits as usize) * len + 7) / 8 + 7); - let bitunpacker = BitUnpacker::new(OwnedBytes::new(data), num_bits); - (bitunpacker, vals) + let bitunpacker = BitUnpacker::new(num_bits); + (bitunpacker, vals, data) } fn test_bitpacker_util(len: usize, num_bits: u8) { - let (bitunpacker, vals) = create_fastfield_bitpacker(len, num_bits); + let (bitunpacker, vals, data) = create_fastfield_bitpacker(len, num_bits); for (i, val) in vals.iter().enumerate() { - assert_eq!(bitunpacker.get(i as u64), *val); + assert_eq!(bitunpacker.get(i as u64, &data), *val); } } diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs new file mode 100644 index 000000000..a635e329e --- /dev/null +++ b/bitpacker/src/blocked_bitpacker.rs @@ -0,0 +1,55 @@ +use super::{bitpacker::BitPacker, compute_num_bits}; + +#[derive(Debug, Clone)] +struct BlockedBitpacker { + compressed_blocks: Vec, + cache: Vec, + offset_and_bits: Vec<(u32, u8)>, + blocksize: u32, +} + +impl BlockedBitpacker { + fn new(blocksize: u32) -> Self { + Self { + compressed_blocks: vec![], + cache: vec![], + offset_and_bits: vec![], + blocksize, + } + } + + fn add(&mut self, el: u64) { + self.cache.push(el); + if self.cache.len() > self.blocksize as usize { + self.flush(); + } + } + + fn flush(&mut self) { + if self.cache.is_empty() { + return; + } + let mut bit_packer = BitPacker::new(); + let num_bits_block = self + .cache + .iter() + .map(|el| compute_num_bits(*el)) + .max() + .unwrap(); + for val in self.cache.iter() { + bit_packer + .write(*val, num_bits_block, &mut self.compressed_blocks) + .unwrap(); // write to im can't fail + } + self.offset_and_bits + .push((self.compressed_blocks.len() as u32, num_bits_block)); + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + test +} diff --git a/bitpacker/src/lib.rs b/bitpacker/src/lib.rs new file mode 100644 index 000000000..be227ea0b --- /dev/null +++ b/bitpacker/src/lib.rs @@ -0,0 +1,12 @@ +mod bitpacker; + +pub use crate::bitpacker::BitPacker; +pub use crate::bitpacker::BitUnpacker; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 9405067ba..5abc2fb6c 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,4 +1,3 @@ -pub mod bitpacker; mod bitset; mod composite_file; mod counting_writer; diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 0e4cc2f64..5f49bcc53 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -1,9 +1,9 @@ use super::FastValue; -use crate::common::bitpacker::BitUnpacker; use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::common::CompositeFile; use crate::directory::FileSlice; +use crate::directory::OwnedBytes; use crate::directory::{Directory, RamDirectory, WritePtr}; use crate::fastfield::{FastFieldSerializer, FastFieldsWriter}; use crate::schema::Schema; @@ -12,6 +12,7 @@ use crate::DocId; use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; +use tantivy_bitpacker::BitUnpacker; /// Trait for accessing a fastfield. /// @@ -19,6 +20,7 @@ use std::path::Path; /// fast field is required. #[derive(Clone)] pub struct FastFieldReader { + bytes: OwnedBytes, bit_unpacker: BitUnpacker, min_value_u64: u64, max_value_u64: u64, @@ -33,8 +35,9 @@ impl FastFieldReader { let amplitude = u64::deserialize(&mut bytes)?; let max_value = min_value + amplitude; let num_bits = compute_num_bits(amplitude); - let bit_unpacker = BitUnpacker::new(bytes, num_bits); + let bit_unpacker = BitUnpacker::new(num_bits); Ok(FastFieldReader { + bytes, min_value_u64: min_value, max_value_u64: max_value, bit_unpacker, @@ -55,7 +58,7 @@ impl FastFieldReader { } pub(crate) fn get_u64(&self, doc: u64) -> Item { - Item::from_u64(self.min_value_u64 + self.bit_unpacker.get(doc)) + Item::from_u64(self.min_value_u64 + self.bit_unpacker.get(doc, &self.bytes)) } /// Internally `multivalued` also use SingleValue Fast fields. diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index d2871d651..a06bc02e8 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -1,4 +1,3 @@ -use crate::common::bitpacker::BitPacker; use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::common::CompositeWrite; @@ -6,6 +5,7 @@ use crate::common::CountingWriter; use crate::directory::WritePtr; use crate::schema::Field; use std::io::{self, Write}; +use tantivy_bitpacker::BitPacker; /// `FastFieldSerializer` is in charge of serializing /// fastfields on disk. diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index 905c56a96..382661e2d 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -1,11 +1,12 @@ use crate::common::compute_num_bits; -use crate::common::{bitpacker::BitPacker, BinarySerializable, FixedSize}; +use crate::common::{BinarySerializable, FixedSize}; use crate::directory::{FileSlice, OwnedBytes}; use crate::postings::TermInfo; use crate::termdict::TermOrdinal; use byteorder::{ByteOrder, LittleEndian}; use std::cmp; use std::io::{self, Read, Write}; +use tantivy_bitpacker::BitPacker; const BLOCK_LEN: usize = 256; @@ -290,11 +291,11 @@ mod tests { use super::TermInfoBlockMeta; use super::{TermInfoStore, TermInfoStoreWriter}; use crate::common; - use crate::common::bitpacker::BitPacker; use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::directory::FileSlice; use crate::postings::TermInfo; + use tantivy_bitpacker::BitPacker; #[test] fn test_term_info_block() { From c200d59d1e3d423212008435a02814f05a509f88 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 29 Apr 2021 19:53:54 +0200 Subject: [PATCH 03/10] add blocked bitpacker, add benches --- bitpacker/benches/bench.rs | 31 +++++++ bitpacker/src/blocked_bitpacker.rs | 92 +++++++++++++++++--- bitpacker/src/lib.rs | 37 ++++++-- src/common/mod.rs | 36 +------- src/fastfield/reader.rs | 2 +- src/fastfield/serializer.rs | 2 +- src/fastfield/writer.rs | 13 ++- src/termdict/fst_termdict/term_info_store.rs | 4 +- 8 files changed, 153 insertions(+), 64 deletions(-) create mode 100644 bitpacker/benches/bench.rs diff --git a/bitpacker/benches/bench.rs b/bitpacker/benches/bench.rs new file mode 100644 index 000000000..90148e9e2 --- /dev/null +++ b/bitpacker/benches/bench.rs @@ -0,0 +1,31 @@ +#![feature(test)] + +extern crate test; + +#[cfg(test)] +mod tests { + use tantivy_bitpacker::BlockedBitpacker; + use test::Bencher; + #[bench] + fn bench_blockedbitp_read(b: &mut Bencher) { + let mut blocked_bitpacker = BlockedBitpacker::new(); + for val in 0..=21500 { + blocked_bitpacker.add(val); + } + blocked_bitpacker.finish(); + b.iter(|| { + for val in 0..=21500 { + blocked_bitpacker.get(val); + } + }); + } + #[bench] + fn bench_blockbitp_create(b: &mut Bencher) { + b.iter(|| { + let mut blocked_bitpacker = BlockedBitpacker::new(); + for val in 0..=21500 { + blocked_bitpacker.add(val); + } + }); + } +} diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index a635e329e..c372d51ee 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -1,39 +1,50 @@ +use crate::BitUnpacker; + use super::{bitpacker::BitPacker, compute_num_bits}; +const BLOCK_SIZE: usize = 128; + #[derive(Debug, Clone)] -struct BlockedBitpacker { +pub struct BlockedBitpacker { + // bitpacked blocks compressed_blocks: Vec, + // uncompressed data, collected until blocksize cache: Vec, offset_and_bits: Vec<(u32, u8)>, - blocksize: u32, + num_elem_last_block: usize, } impl BlockedBitpacker { - fn new(blocksize: u32) -> Self { + pub fn new() -> Self { Self { compressed_blocks: vec![], cache: vec![], offset_and_bits: vec![], - blocksize, + num_elem_last_block: 0, } } - fn add(&mut self, el: u64) { - self.cache.push(el); - if self.cache.len() > self.blocksize as usize { + pub fn get_memory_usage(&self) -> usize { + self.compressed_blocks.capacity() + self.offset_and_bits.capacity() + self.cache.capacity() + } + + pub fn add(&mut self, val: u64) { + self.cache.push(val); + if self.cache.len() == BLOCK_SIZE as usize { self.flush(); } } - fn flush(&mut self) { + pub fn flush(&mut self) { if self.cache.is_empty() { return; } let mut bit_packer = BitPacker::new(); + let offset = self.compressed_blocks.len() as u32; let num_bits_block = self .cache .iter() - .map(|el| compute_num_bits(*el)) + .map(|val| compute_num_bits(*val)) .max() .unwrap(); for val in self.cache.iter() { @@ -41,15 +52,68 @@ impl BlockedBitpacker { .write(*val, num_bits_block, &mut self.compressed_blocks) .unwrap(); // write to im can't fail } - self.offset_and_bits - .push((self.compressed_blocks.len() as u32, num_bits_block)); + bit_packer.flush(&mut self.compressed_blocks).unwrap(); + self.offset_and_bits.push((offset, num_bits_block)); + + self.cache.clear(); + } + pub fn finish(&mut self) { + self.num_elem_last_block = self.cache.len(); + self.flush(); + //add padding + self.compressed_blocks + .resize(self.compressed_blocks.len() + 8, 0); + } + pub fn get(&self, idx: usize) -> u64 { + let metadata_pos = idx / BLOCK_SIZE as usize; + let pos_in_block = idx % BLOCK_SIZE as usize; + let (block_pos, num_bits) = self.offset_and_bits[metadata_pos]; + let unpacked = BitUnpacker::new(num_bits).get( + pos_in_block as u64, + &self.compressed_blocks[block_pos as usize..], + ); + unpacked + } + + pub fn iter(&self) -> impl Iterator + '_ { + let num_elems = if self.offset_and_bits.is_empty() { + 0 + } else { + (self.offset_and_bits.len() - 1) * 128 + self.num_elem_last_block + }; + let iter = (0..num_elems).map(move |idx| self.get(idx)); + iter } } - #[cfg(test)] mod tests { use super::*; - - test + #[test] + fn blocked_bitpacker_empty() { + let mut blocked_bitpacker = BlockedBitpacker::new(); + blocked_bitpacker.finish(); + assert_eq!(blocked_bitpacker.iter().collect::>(), vec![]); + } + #[test] + fn blocked_bitpacker_one() { + let mut blocked_bitpacker = BlockedBitpacker::new(); + blocked_bitpacker.add(50000); + blocked_bitpacker.finish(); + assert_eq!(blocked_bitpacker.get(0), 50000); + assert_eq!(blocked_bitpacker.iter().collect::>(), vec![50000]); + } + #[test] + fn blocked_bitpacker_test() { + let mut blocked_bitpacker = BlockedBitpacker::new(); + for val in 0..21500 { + blocked_bitpacker.add(val); + } + blocked_bitpacker.finish(); + for val in 0..21500 { + assert_eq!(blocked_bitpacker.get(val as usize), val); + } + assert_eq!(blocked_bitpacker.iter().count(), 21500); + assert_eq!(blocked_bitpacker.iter().last().unwrap(), 21499); + } } diff --git a/bitpacker/src/lib.rs b/bitpacker/src/lib.rs index be227ea0b..cfc13c0f1 100644 --- a/bitpacker/src/lib.rs +++ b/bitpacker/src/lib.rs @@ -1,12 +1,39 @@ mod bitpacker; +mod blocked_bitpacker; pub use crate::bitpacker::BitPacker; pub use crate::bitpacker::BitUnpacker; +pub use crate::blocked_bitpacker::BlockedBitpacker; -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); +/// Computes the number of bits that will be used for bitpacking. +/// +/// In general the target is the minimum number of bits +/// required to express the amplitude given in argument. +/// +/// e.g. If the amplitude is 10, we can store all ints on simply 4bits. +/// +/// The logic is slightly more convoluted here as for optimization +/// reasons, we want to ensure that a value spawns over at most 8 bytes +/// of aligns bytes. +/// +/// Spanning over 9 bytes is possible for instance, if we do +/// bitpacking with an amplitude of 63 bits. +/// In this case, the second int will start on bit +/// 63 (which belongs to byte 7) and ends at byte 15; +/// Hence 9 bytes (from byte 7 to byte 15 included). +/// +/// To avoid this, we force the number of bits to 64bits +/// when the result is greater than `64-8 = 56 bits`. +/// +/// Note that this only affects rare use cases spawning over +/// a very large range of values. Even in this case, it results +/// in an extra cost of at most 12% compared to the optimal +/// number of bits. +pub fn compute_num_bits(n: u64) -> u8 { + let amplitude = (64u32 - n.leading_zeros()) as u8; + if amplitude <= 64 - 8 { + amplitude + } else { + 64 } } diff --git a/src/common/mod.rs b/src/common/mod.rs index 5abc2fb6c..43ea7be51 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -32,39 +32,6 @@ where None } -/// Computes the number of bits that will be used for bitpacking. -/// -/// In general the target is the minimum number of bits -/// required to express the amplitude given in argument. -/// -/// e.g. If the amplitude is 10, we can store all ints on simply 4bits. -/// -/// The logic is slightly more convoluted here as for optimization -/// reasons, we want to ensure that a value spawns over at most 8 bytes -/// of aligns bytes. -/// -/// Spanning over 9 bytes is possible for instance, if we do -/// bitpacking with an amplitude of 63 bits. -/// In this case, the second int will start on bit -/// 63 (which belongs to byte 7) and ends at byte 15; -/// Hence 9 bytes (from byte 7 to byte 15 included). -/// -/// To avoid this, we force the number of bits to 64bits -/// when the result is greater than `64-8 = 56 bits`. -/// -/// Note that this only affects rare use cases spawning over -/// a very large range of values. Even in this case, it results -/// in an extra cost of at most 12% compared to the optimal -/// number of bits. -pub(crate) fn compute_num_bits(n: u64) -> u8 { - let amplitude = (64u32 - n.leading_zeros()) as u8; - if amplitude <= 64 - 8 { - amplitude - } else { - 64 - } -} - /// Has length trait pub trait HasLen { /// Return length @@ -151,9 +118,10 @@ pub(crate) mod test { pub use super::minmax; pub use super::serialize::test::fixed_size_test; - use super::{compute_num_bits, f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; + use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; use proptest::prelude::*; use std::f64; + use tantivy_bitpacker::compute_num_bits; fn test_i64_converter_helper(val: i64) { assert_eq!(u64_to_i64(i64_to_u64(val)), val); diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 5f49bcc53..18cc77dbe 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -1,5 +1,4 @@ use super::FastValue; -use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::common::CompositeFile; use crate::directory::FileSlice; @@ -12,6 +11,7 @@ use crate::DocId; use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; +use tantivy_bitpacker::compute_num_bits; use tantivy_bitpacker::BitUnpacker; /// Trait for accessing a fastfield. diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index a06bc02e8..fc03f7d21 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -1,10 +1,10 @@ -use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::common::CompositeWrite; use crate::common::CountingWriter; use crate::directory::WritePtr; use crate::schema::Field; use std::io::{self, Write}; +use tantivy_bitpacker::compute_num_bits; use tantivy_bitpacker::BitPacker; /// `FastFieldSerializer` is in charge of serializing diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 3eb3b42e2..e6aa45e5f 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -9,6 +9,7 @@ use crate::termdict::TermOrdinal; use fnv::FnvHashMap; use std::collections::HashMap; use std::io; +use tantivy_bitpacker::BlockedBitpacker; /// The fastfieldswriter regroup all of the fast field writers. pub struct FastFieldsWriter { @@ -157,7 +158,8 @@ impl FastFieldsWriter { /// using `common::i64_to_u64` and `common::f64_to_u64`. pub struct IntFastFieldWriter { field: Field, - vals: Vec, + //vals: Vec, + vals: BlockedBitpacker, val_count: usize, val_if_missing: u64, val_min: u64, @@ -169,7 +171,7 @@ impl IntFastFieldWriter { pub fn new(field: Field) -> IntFastFieldWriter { IntFastFieldWriter { field, - vals: Vec::new(), + vals: BlockedBitpacker::new(), val_count: 0, val_if_missing: 0u64, val_min: u64::max_value(), @@ -196,9 +198,7 @@ impl IntFastFieldWriter { /// associated to the document with the `DocId` n. /// (Well, `n-1` actually because of 0-indexing) pub fn add_val(&mut self, val: u64) { - VInt(val) - .serialize(&mut self.vals) - .expect("unable to serialize VInt to Vec"); + self.vals.add(val); if val > self.val_max { self.val_max = val; @@ -244,8 +244,7 @@ impl IntFastFieldWriter { let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?; - let mut cursor = self.vals.as_slice(); - while let Ok(VInt(val)) = VInt::deserialize(&mut cursor) { + for val in self.vals.iter() { single_field_serializer.add_val(val)?; } diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index 382661e2d..e78d7f2cd 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -1,4 +1,3 @@ -use crate::common::compute_num_bits; use crate::common::{BinarySerializable, FixedSize}; use crate::directory::{FileSlice, OwnedBytes}; use crate::postings::TermInfo; @@ -6,6 +5,7 @@ use crate::termdict::TermOrdinal; use byteorder::{ByteOrder, LittleEndian}; use std::cmp; use std::io::{self, Read, Write}; +use tantivy_bitpacker::compute_num_bits; use tantivy_bitpacker::BitPacker; const BLOCK_LEN: usize = 256; @@ -291,10 +291,10 @@ mod tests { use super::TermInfoBlockMeta; use super::{TermInfoStore, TermInfoStoreWriter}; use crate::common; - use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::directory::FileSlice; use crate::postings::TermInfo; + use tantivy_bitpacker::compute_num_bits; use tantivy_bitpacker::BitPacker; #[test] From a04e0bdaf19e618567f876bffbc6010213b5b42b Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 29 Apr 2021 19:57:17 +0200 Subject: [PATCH 04/10] use flushfree blocked bitpacker (10% slower) --- bitpacker/benches/bench.rs | 1 - bitpacker/src/blocked_bitpacker.rs | 52 ++++++++++++++---------------- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/bitpacker/benches/bench.rs b/bitpacker/benches/bench.rs index 90148e9e2..fc3c88d95 100644 --- a/bitpacker/benches/bench.rs +++ b/bitpacker/benches/bench.rs @@ -12,7 +12,6 @@ mod tests { for val in 0..=21500 { blocked_bitpacker.add(val); } - blocked_bitpacker.finish(); b.iter(|| { for val in 0..=21500 { blocked_bitpacker.get(val); diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index c372d51ee..eb5a9ee0a 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -8,19 +8,19 @@ const BLOCK_SIZE: usize = 128; pub struct BlockedBitpacker { // bitpacked blocks compressed_blocks: Vec, - // uncompressed data, collected until blocksize + // uncompressed data, collected until BLOCK_SIZE cache: Vec, offset_and_bits: Vec<(u32, u8)>, - num_elem_last_block: usize, } impl BlockedBitpacker { pub fn new() -> Self { + let mut compressed_blocks = vec![]; + compressed_blocks.resize(8, 0); Self { - compressed_blocks: vec![], + compressed_blocks, cache: vec![], offset_and_bits: vec![], - num_elem_last_block: 0, } } @@ -40,48 +40,47 @@ impl BlockedBitpacker { return; } let mut bit_packer = BitPacker::new(); - let offset = self.compressed_blocks.len() as u32; let num_bits_block = self .cache .iter() .map(|val| compute_num_bits(*val)) .max() .unwrap(); + self.compressed_blocks + .resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker + let offset = self.compressed_blocks.len() as u32; for val in self.cache.iter() { bit_packer .write(*val, num_bits_block, &mut self.compressed_blocks) - .unwrap(); // write to im can't fail + .expect("cannot write bitpacking to output"); // write to im can't fail } bit_packer.flush(&mut self.compressed_blocks).unwrap(); self.offset_and_bits.push((offset, num_bits_block)); self.cache.clear(); - } - pub fn finish(&mut self) { - self.num_elem_last_block = self.cache.len(); - self.flush(); - //add padding self.compressed_blocks - .resize(self.compressed_blocks.len() + 8, 0); + .resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker } pub fn get(&self, idx: usize) -> u64 { let metadata_pos = idx / BLOCK_SIZE as usize; let pos_in_block = idx % BLOCK_SIZE as usize; - let (block_pos, num_bits) = self.offset_and_bits[metadata_pos]; - let unpacked = BitUnpacker::new(num_bits).get( - pos_in_block as u64, - &self.compressed_blocks[block_pos as usize..], - ); - unpacked + if let Some((block_pos, num_bits)) = self.offset_and_bits.get(metadata_pos).cloned() { + //let (block_pos, num_bits) = self.offset_and_bits[metadata_pos]; + let unpacked = BitUnpacker::new(num_bits).get( + pos_in_block as u64, + &self.compressed_blocks[block_pos as usize..], + ); + unpacked + } else { + self.cache[pos_in_block] + } } pub fn iter(&self) -> impl Iterator + '_ { - let num_elems = if self.offset_and_bits.is_empty() { - 0 - } else { - (self.offset_and_bits.len() - 1) * 128 + self.num_elem_last_block - }; - let iter = (0..num_elems).map(move |idx| self.get(idx)); + let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE; + let iter = (0..bitpacked_elems) + .map(move |idx| self.get(idx)) + .chain(self.cache.iter().cloned()); iter } } @@ -91,15 +90,13 @@ mod tests { use super::*; #[test] fn blocked_bitpacker_empty() { - let mut blocked_bitpacker = BlockedBitpacker::new(); - blocked_bitpacker.finish(); + let blocked_bitpacker = BlockedBitpacker::new(); assert_eq!(blocked_bitpacker.iter().collect::>(), vec![]); } #[test] fn blocked_bitpacker_one() { let mut blocked_bitpacker = BlockedBitpacker::new(); blocked_bitpacker.add(50000); - blocked_bitpacker.finish(); assert_eq!(blocked_bitpacker.get(0), 50000); assert_eq!(blocked_bitpacker.iter().collect::>(), vec![50000]); } @@ -109,7 +106,6 @@ mod tests { for val in 0..21500 { blocked_bitpacker.add(val); } - blocked_bitpacker.finish(); for val in 0..21500 { assert_eq!(blocked_bitpacker.get(val as usize), val); } From 83cf638a2e0be6989a2e85fcd4cdc4210ca06510 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 30 Apr 2021 07:23:44 +0200 Subject: [PATCH 05/10] use 64bit encoded metadata fix memory_usage calculation --- bitpacker/src/blocked_bitpacker.rs | 47 +++++++++++++++++++++++++----- src/fastfield/writer.rs | 1 - 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index eb5a9ee0a..396c4e99a 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -4,13 +4,40 @@ use super::{bitpacker::BitPacker, compute_num_bits}; const BLOCK_SIZE: usize = 128; +/// BlockedBitpacker compresses data in blocks of +/// 128 elements, while keeping an index on it +/// #[derive(Debug, Clone)] pub struct BlockedBitpacker { // bitpacked blocks compressed_blocks: Vec, // uncompressed data, collected until BLOCK_SIZE cache: Vec, - offset_and_bits: Vec<(u32, u8)>, + offset_and_bits: Vec, +} + +#[derive(Debug, Clone, Default)] +struct BlockedBitpackerEntryMetaData { + encoded: u64, +} + +impl BlockedBitpackerEntryMetaData { + fn new(offset: u64, num_bits: u8) -> Self { + let encoded = offset | (num_bits as u64) << 56; + Self { encoded } + } + fn offset(&self) -> u64 { + (self.encoded << 8) >> 8 + } + fn num_bits(&self) -> u8 { + (self.encoded >> 56) as u8 + } +} +#[test] +fn metadata_test() { + let meta = BlockedBitpackerEntryMetaData::new(50000, 6); + assert_eq!(meta.offset(), 50000); + assert_eq!(meta.num_bits(), 6); } impl BlockedBitpacker { @@ -25,7 +52,11 @@ impl BlockedBitpacker { } pub fn get_memory_usage(&self) -> usize { - self.compressed_blocks.capacity() + self.offset_and_bits.capacity() + self.cache.capacity() + self.compressed_blocks.capacity() + + self.offset_and_bits.capacity() + * std::mem::size_of_val(&self.offset_and_bits.get(0).cloned().unwrap_or_default()) + + self.cache.capacity() + * std::mem::size_of_val(&self.cache.get(0).cloned().unwrap_or_default()) } pub fn add(&mut self, val: u64) { @@ -48,14 +79,15 @@ impl BlockedBitpacker { .unwrap(); self.compressed_blocks .resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker - let offset = self.compressed_blocks.len() as u32; + let offset = self.compressed_blocks.len() as u64; for val in self.cache.iter() { bit_packer .write(*val, num_bits_block, &mut self.compressed_blocks) .expect("cannot write bitpacking to output"); // write to im can't fail } bit_packer.flush(&mut self.compressed_blocks).unwrap(); - self.offset_and_bits.push((offset, num_bits_block)); + self.offset_and_bits + .push(BlockedBitpackerEntryMetaData::new(offset, num_bits_block)); self.cache.clear(); self.compressed_blocks @@ -64,11 +96,10 @@ impl BlockedBitpacker { pub fn get(&self, idx: usize) -> u64 { let metadata_pos = idx / BLOCK_SIZE as usize; let pos_in_block = idx % BLOCK_SIZE as usize; - if let Some((block_pos, num_bits)) = self.offset_and_bits.get(metadata_pos).cloned() { - //let (block_pos, num_bits) = self.offset_and_bits[metadata_pos]; - let unpacked = BitUnpacker::new(num_bits).get( + if let Some(metadata) = self.offset_and_bits.get(metadata_pos) { + let unpacked = BitUnpacker::new(metadata.num_bits()).get( pos_in_block as u64, - &self.compressed_blocks[block_pos as usize..], + &self.compressed_blocks[metadata.offset() as usize..], ); unpacked } else { diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index e6aa45e5f..68702e3c1 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -158,7 +158,6 @@ impl FastFieldsWriter { /// using `common::i64_to_u64` and `common::f64_to_u64`. pub struct IntFastFieldWriter { field: Field, - //vals: Vec, vals: BlockedBitpacker, val_count: usize, val_if_missing: u64, From 25b94299291d92a26843b3e0b33c9ca2efd7a54d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 30 Apr 2021 14:16:39 +0200 Subject: [PATCH 06/10] calc mem_usage of more structs calc mem_usage of more structs in index creation add some comments --- bitpacker/benches/bench.rs | 11 +++++++---- bitpacker/src/blocked_bitpacker.rs | 22 ++++++++++++++++++---- bitpacker/src/lib.rs | 2 +- src/fastfield/bytes/writer.rs | 4 ++++ src/fastfield/multivalued/writer.rs | 6 ++++++ src/fastfield/writer.rs | 25 +++++++++++++++++++++++-- src/fieldnorm/writer.rs | 7 +++++++ src/indexer/segment_writer.rs | 2 ++ src/store/writer.rs | 5 +++++ 9 files changed, 73 insertions(+), 11 deletions(-) diff --git a/bitpacker/benches/bench.rs b/bitpacker/benches/bench.rs index fc3c88d95..c0f8e44df 100644 --- a/bitpacker/benches/bench.rs +++ b/bitpacker/benches/bench.rs @@ -10,21 +10,24 @@ mod tests { fn bench_blockedbitp_read(b: &mut Bencher) { let mut blocked_bitpacker = BlockedBitpacker::new(); for val in 0..=21500 { - blocked_bitpacker.add(val); + blocked_bitpacker.add(val * val); } b.iter(|| { + let mut out = 0; for val in 0..=21500 { - blocked_bitpacker.get(val); + out = blocked_bitpacker.get(val); } + out }); } #[bench] - fn bench_blockbitp_create(b: &mut Bencher) { + fn bench_blockedbitp_create(b: &mut Bencher) { b.iter(|| { let mut blocked_bitpacker = BlockedBitpacker::new(); for val in 0..=21500 { - blocked_bitpacker.add(val); + blocked_bitpacker.add(val * val); } + blocked_bitpacker }); } } diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index 396c4e99a..ea76bae97 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -4,7 +4,7 @@ use super::{bitpacker::BitPacker, compute_num_bits}; const BLOCK_SIZE: usize = 128; -/// BlockedBitpacker compresses data in blocks of +/// `BlockedBitpacker` compresses data in blocks of /// 128 elements, while keeping an index on it /// #[derive(Debug, Clone)] @@ -16,6 +16,12 @@ pub struct BlockedBitpacker { offset_and_bits: Vec, } +/// `BlockedBitpackerEntryMetaData` encodes the +/// offset and bit_width into a u64 bit field +/// +/// This saves some space, since 7byte is more +/// than enough and also keeps the access fast +/// because of alignment #[derive(Debug, Clone, Default)] struct BlockedBitpackerEntryMetaData { encoded: u64, @@ -23,7 +29,7 @@ struct BlockedBitpackerEntryMetaData { impl BlockedBitpackerEntryMetaData { fn new(offset: u64, num_bits: u8) -> Self { - let encoded = offset | (num_bits as u64) << 56; + let encoded = offset | (num_bits as u64) << (64 - 8); Self { encoded } } fn offset(&self) -> u64 { @@ -33,6 +39,7 @@ impl BlockedBitpackerEntryMetaData { (self.encoded >> 56) as u8 } } + #[test] fn metadata_test() { let meta = BlockedBitpackerEntryMetaData::new(50000, 6); @@ -51,8 +58,10 @@ impl BlockedBitpacker { } } - pub fn get_memory_usage(&self) -> usize { - self.compressed_blocks.capacity() + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + std::mem::size_of::() + + self.compressed_blocks.capacity() + self.offset_and_bits.capacity() * std::mem::size_of_val(&self.offset_and_bits.get(0).cloned().unwrap_or_default()) + self.cache.capacity() @@ -80,6 +89,10 @@ impl BlockedBitpacker { self.compressed_blocks .resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker let offset = self.compressed_blocks.len() as u64; + // todo performance: for some bit_width we + // can encode multiple vals into the + // mini_buffer before checking to flush + // (to be done in BitPacker) for val in self.cache.iter() { bit_packer .write(*val, num_bits_block, &mut self.compressed_blocks) @@ -108,6 +121,7 @@ impl BlockedBitpacker { } pub fn iter(&self) -> impl Iterator + '_ { + // todo performance: we could decompress the whole block and cache it instead let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE; let iter = (0..bitpacked_elems) .map(move |idx| self.get(idx)) diff --git a/bitpacker/src/lib.rs b/bitpacker/src/lib.rs index cfc13c0f1..cb566769c 100644 --- a/bitpacker/src/lib.rs +++ b/bitpacker/src/lib.rs @@ -14,7 +14,7 @@ pub use crate::blocked_bitpacker::BlockedBitpacker; /// /// The logic is slightly more convoluted here as for optimization /// reasons, we want to ensure that a value spawns over at most 8 bytes -/// of aligns bytes. +/// of aligned bytes. /// /// Spanning over 9 bytes is possible for instance, if we do /// bitpacking with an amplitude of 63 bits. diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index e98d089c1..44a835a42 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -35,6 +35,10 @@ impl BytesFastFieldWriter { } } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.capacity() + self.doc_index.capacity() * std::mem::size_of::() + } /// Access the field associated to the `BytesFastFieldWriter` pub fn field(&self) -> Field { self.field diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 2dedcf4cc..e7c2bc409 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -48,6 +48,12 @@ impl MultiValuedFastFieldWriter { } } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.capacity() * std::mem::size_of::() + + self.doc_index.capacity() * std::mem::size_of::() + } + /// Access the field associated to the `MultiValuedFastFieldWriter` pub fn field(&self) -> Field { self.field diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 68702e3c1..0d5f31ced 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -1,7 +1,5 @@ use super::multivalued::MultiValuedFastFieldWriter; use crate::common; -use crate::common::BinarySerializable; -use crate::common::VInt; use crate::fastfield::{BytesFastFieldWriter, FastFieldSerializer}; use crate::postings::UnorderedTermId; use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema}; @@ -73,6 +71,24 @@ impl FastFieldsWriter { } } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.single_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() + + self + .multi_values_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() + + self + .bytes_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() + } + /// Get the `FastFieldWriter` associated to a field. pub fn get_field_writer(&mut self, field: Field) -> Option<&mut IntFastFieldWriter> { // TODO optimize @@ -178,6 +194,11 @@ impl IntFastFieldWriter { } } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.mem_usage() + } + /// Returns the field that this writer is targetting. pub fn field(&self) -> Field { self.field diff --git a/src/fieldnorm/writer.rs b/src/fieldnorm/writer.rs index 061522e5c..5b99a9a92 100644 --- a/src/fieldnorm/writer.rs +++ b/src/fieldnorm/writer.rs @@ -50,6 +50,13 @@ impl FieldNormsWriter { } } + /// The memory used inclusive childs + pub fn mem_usage(&self) -> usize { + self.fieldnorms_buffer + .iter() + .map(|buf| buf.capacity()) + .sum() + } /// Ensure that all documents in 0..max_doc have a byte associated with them /// in each of the fieldnorm vectors. /// diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 569dc6ce0..e4064b106 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -111,6 +111,8 @@ impl SegmentWriter { pub fn mem_usage(&self) -> usize { self.multifield_postings.mem_usage() + + self.fieldnorms_writer.mem_usage() + + self.fast_field_writers.mem_usage() } /// Indexes a new document diff --git a/src/store/writer.rs b/src/store/writer.rs index 1728636e1..b9e36b8bf 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -45,6 +45,11 @@ impl StoreWriter { } } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.intermediary_buffer.capacity() + self.current_block.capacity() + } + /// Store a new document. /// /// The document id is implicitely the number of times From f38daab7f7e8b6cca1884d093fa862777bd58043 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 30 Apr 2021 14:47:58 +0200 Subject: [PATCH 07/10] add base value to blocked bitpacker --- bitpacker/src/blocked_bitpacker.rs | 32 ++++++++++++++++++++++-------- src/indexer/segment_serializer.rs | 5 +++++ src/indexer/segment_writer.rs | 1 + 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index ea76bae97..97c124b1d 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -25,12 +25,16 @@ pub struct BlockedBitpacker { #[derive(Debug, Clone, Default)] struct BlockedBitpackerEntryMetaData { encoded: u64, + base_value: u64, } impl BlockedBitpackerEntryMetaData { - fn new(offset: u64, num_bits: u8) -> Self { + fn new(offset: u64, num_bits: u8, base_value: u64) -> Self { let encoded = offset | (num_bits as u64) << (64 - 8); - Self { encoded } + Self { + encoded, + base_value, + } } fn offset(&self) -> u64 { (self.encoded << 8) >> 8 @@ -38,11 +42,14 @@ impl BlockedBitpackerEntryMetaData { fn num_bits(&self) -> u8 { (self.encoded >> 56) as u8 } + fn base_value(&self) -> u64 { + self.base_value + } } #[test] fn metadata_test() { - let meta = BlockedBitpackerEntryMetaData::new(50000, 6); + let meta = BlockedBitpackerEntryMetaData::new(50000, 6, 40000); assert_eq!(meta.offset(), 50000); assert_eq!(meta.num_bits(), 6); } @@ -80,10 +87,11 @@ impl BlockedBitpacker { return; } let mut bit_packer = BitPacker::new(); + let base_value = self.cache.iter().min().unwrap(); let num_bits_block = self .cache .iter() - .map(|val| compute_num_bits(*val)) + .map(|val| compute_num_bits(*val - base_value)) .max() .unwrap(); self.compressed_blocks @@ -95,12 +103,20 @@ impl BlockedBitpacker { // (to be done in BitPacker) for val in self.cache.iter() { bit_packer - .write(*val, num_bits_block, &mut self.compressed_blocks) + .write( + *val - base_value, + num_bits_block, + &mut self.compressed_blocks, + ) .expect("cannot write bitpacking to output"); // write to im can't fail } bit_packer.flush(&mut self.compressed_blocks).unwrap(); self.offset_and_bits - .push(BlockedBitpackerEntryMetaData::new(offset, num_bits_block)); + .push(BlockedBitpackerEntryMetaData::new( + offset, + num_bits_block, + *base_value, + )); self.cache.clear(); self.compressed_blocks @@ -114,14 +130,14 @@ impl BlockedBitpacker { pos_in_block as u64, &self.compressed_blocks[metadata.offset() as usize..], ); - unpacked + unpacked + metadata.base_value() } else { self.cache[pos_in_block] } } pub fn iter(&self) -> impl Iterator + '_ { - // todo performance: we could decompress the whole block and cache it instead + // todo performance: we could decompress a whole block and cache it instead let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE; let iter = (0..bitpacked_elems) .map(move |idx| self.get(idx)) diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 9b8babde9..97c0196b8 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -36,6 +36,11 @@ impl SegmentSerializer { }) } + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.store_writer.mem_usage() + } + pub fn segment(&self) -> &Segment { &self.segment } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index e4064b106..586dfbd25 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -113,6 +113,7 @@ impl SegmentWriter { self.multifield_postings.mem_usage() + self.fieldnorms_writer.mem_usage() + self.fast_field_writers.mem_usage() + + self.segment_serializer.mem_usage() } /// Indexes a new document From fde9d27482980d6fb6f664530284faa6d0271c84 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 30 Apr 2021 16:29:02 +0200 Subject: [PATCH 08/10] refactor --- bitpacker/src/blocked_bitpacker.rs | 90 +++++++++++++++--------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index 97c124b1d..271ab7f67 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -12,7 +12,7 @@ pub struct BlockedBitpacker { // bitpacked blocks compressed_blocks: Vec, // uncompressed data, collected until BLOCK_SIZE - cache: Vec, + buffer: Vec, offset_and_bits: Vec, } @@ -60,7 +60,7 @@ impl BlockedBitpacker { compressed_blocks.resize(8, 0); Self { compressed_blocks, - cache: vec![], + buffer: vec![], offset_and_bits: vec![], } } @@ -71,56 +71,58 @@ impl BlockedBitpacker { + self.compressed_blocks.capacity() + self.offset_and_bits.capacity() * std::mem::size_of_val(&self.offset_and_bits.get(0).cloned().unwrap_or_default()) - + self.cache.capacity() - * std::mem::size_of_val(&self.cache.get(0).cloned().unwrap_or_default()) + + self.buffer.capacity() + * std::mem::size_of_val(&self.buffer.get(0).cloned().unwrap_or_default()) } pub fn add(&mut self, val: u64) { - self.cache.push(val); - if self.cache.len() == BLOCK_SIZE as usize { + self.buffer.push(val); + if self.buffer.len() == BLOCK_SIZE as usize { self.flush(); } } pub fn flush(&mut self) { - if self.cache.is_empty() { + if let Some(min_value) = self.buffer.iter().min() { + let mut bit_packer = BitPacker::new(); + let num_bits_block = self + .buffer + .iter() + .map(|val| compute_num_bits(*val - min_value)) + .max() + .unwrap(); + // todo performance: the padding handling could be done better, e.g. use a slice and + // return num_bytes written from bitpacker + self.compressed_blocks + .resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker + let offset = self.compressed_blocks.len() as u64; + // todo performance: for some bit_width we + // can encode multiple vals into the + // mini_buffer before checking to flush + // (to be done in BitPacker) + for val in self.buffer.iter() { + bit_packer + .write( + *val - min_value, + num_bits_block, + &mut self.compressed_blocks, + ) + .expect("cannot write bitpacking to output"); // write to in memory can't fail + } + bit_packer.flush(&mut self.compressed_blocks).unwrap(); + self.offset_and_bits + .push(BlockedBitpackerEntryMetaData::new( + offset, + num_bits_block, + *min_value, + )); + + self.buffer.clear(); + self.compressed_blocks + .resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker + } else { return; } - let mut bit_packer = BitPacker::new(); - let base_value = self.cache.iter().min().unwrap(); - let num_bits_block = self - .cache - .iter() - .map(|val| compute_num_bits(*val - base_value)) - .max() - .unwrap(); - self.compressed_blocks - .resize(self.compressed_blocks.len() - 8, 0); // remove padding for bitpacker - let offset = self.compressed_blocks.len() as u64; - // todo performance: for some bit_width we - // can encode multiple vals into the - // mini_buffer before checking to flush - // (to be done in BitPacker) - for val in self.cache.iter() { - bit_packer - .write( - *val - base_value, - num_bits_block, - &mut self.compressed_blocks, - ) - .expect("cannot write bitpacking to output"); // write to im can't fail - } - bit_packer.flush(&mut self.compressed_blocks).unwrap(); - self.offset_and_bits - .push(BlockedBitpackerEntryMetaData::new( - offset, - num_bits_block, - *base_value, - )); - - self.cache.clear(); - self.compressed_blocks - .resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker } pub fn get(&self, idx: usize) -> u64 { let metadata_pos = idx / BLOCK_SIZE as usize; @@ -132,7 +134,7 @@ impl BlockedBitpacker { ); unpacked + metadata.base_value() } else { - self.cache[pos_in_block] + self.buffer[pos_in_block] } } @@ -141,7 +143,7 @@ impl BlockedBitpacker { let bitpacked_elems = self.offset_and_bits.len() * BLOCK_SIZE; let iter = (0..bitpacked_elems) .map(move |idx| self.get(idx)) - .chain(self.cache.iter().cloned()); + .chain(self.buffer.iter().cloned()); iter } } From 478571ebb448021b9530d423e92ce0a8ff393934 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 30 Apr 2021 17:07:30 +0200 Subject: [PATCH 09/10] move minmax to bitpacker move minmax to bitpacker use minmax in blocked bitpacker --- bitpacker/src/blocked_bitpacker.rs | 11 +++-------- bitpacker/src/lib.rs | 13 +++++++++++++ src/common/mod.rs | 15 +-------------- src/fastfield/multivalued/writer.rs | 3 ++- src/indexer/merger.rs | 4 +++- 5 files changed, 22 insertions(+), 24 deletions(-) diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index 271ab7f67..027886dfb 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -1,4 +1,4 @@ -use crate::BitUnpacker; +use crate::{minmax, BitUnpacker}; use super::{bitpacker::BitPacker, compute_num_bits}; @@ -83,14 +83,9 @@ impl BlockedBitpacker { } pub fn flush(&mut self) { - if let Some(min_value) = self.buffer.iter().min() { + if let Some((min_value, max_value)) = minmax(self.buffer.iter()) { let mut bit_packer = BitPacker::new(); - let num_bits_block = self - .buffer - .iter() - .map(|val| compute_num_bits(*val - min_value)) - .max() - .unwrap(); + let num_bits_block = compute_num_bits(*max_value - min_value); // todo performance: the padding handling could be done better, e.g. use a slice and // return num_bytes written from bitpacker self.compressed_blocks diff --git a/bitpacker/src/lib.rs b/bitpacker/src/lib.rs index cb566769c..1697a8488 100644 --- a/bitpacker/src/lib.rs +++ b/bitpacker/src/lib.rs @@ -37,3 +37,16 @@ pub fn compute_num_bits(n: u64) -> u8 { 64 } } + +pub fn minmax(mut vals: I) -> Option<(T, T)> +where + I: Iterator, + T: Copy + Ord, +{ + if let Some(first_el) = vals.next() { + return Some(vals.fold((first_el, first_el), |(min_val, max_val), el| { + (min_val.min(el), max_val.max(el)) + })); + } + None +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 43ea7be51..bfa3b1262 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -19,19 +19,6 @@ pub use byteorder::LittleEndian as Endianness; /// We do not allow segments with more than pub const MAX_DOC_LIMIT: u32 = 1 << 31; -pub fn minmax(mut vals: I) -> Option<(T, T)> -where - I: Iterator, - T: Copy + Ord, -{ - if let Some(first_el) = vals.next() { - return Some(vals.fold((first_el, first_el), |(min_val, max_val), el| { - (min_val.min(el), max_val.max(el)) - })); - } - None -} - /// Has length trait pub trait HasLen { /// Return length @@ -116,12 +103,12 @@ pub fn u64_to_f64(val: u64) -> f64 { #[cfg(test)] pub(crate) mod test { - pub use super::minmax; pub use super::serialize::test::fixed_size_test; use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; use proptest::prelude::*; use std::f64; use tantivy_bitpacker::compute_num_bits; + pub use tantivy_bitpacker::minmax; fn test_i64_converter_helper(val: i64) { assert_eq!(u64_to_i64(i64_to_u64(val)), val); diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index e7c2bc409..b25888e78 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -8,6 +8,7 @@ use crate::DocId; use fnv::FnvHashMap; use std::io; use std::iter::once; +use tantivy_bitpacker::minmax; /// Writer for multi-valued (as in, more than one value per document) /// int fast field. @@ -154,7 +155,7 @@ impl MultiValuedFastFieldWriter { } } None => { - let val_min_max = crate::common::minmax(self.vals.iter().cloned()); + let val_min_max = minmax(self.vals.iter().cloned()); let (val_min, val_max) = val_min_max.unwrap_or((0u64, 0u64)); value_serializer = serializer.new_u64_fast_field_with_idx(self.field, val_min, val_max, 1)?; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 93099b50c..6004d2984 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,3 +1,5 @@ +use tantivy_bitpacker::minmax; + use crate::common::MAX_DOC_LIMIT; use crate::core::Segment; use crate::core::SegmentReader; @@ -70,7 +72,7 @@ fn compute_min_max_val( Some(delete_bitset) => { // some deleted documents, // we need to recompute the max / min - crate::common::minmax( + minmax( (0..max_doc) .filter(|doc_id| delete_bitset.is_alive(*doc_id)) .map(|doc_id| u64_reader.get(doc_id)), From 537021e12d38bcc3d2e5f33ee1d54ced66d057ed Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 3 May 2021 09:09:42 +0200 Subject: [PATCH 10/10] upate CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 164da9cb8..4af344faf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Tantivy 0.15.0 - Added lz4-flex as the default compression scheme in tantivy (@PSeitz) #1009 - Renamed a lot of symbols to avoid all uppercasing on acronyms, as per new clippy recommendation. For instance, RAMDireotory -> RamDirectory. (@pmasurel) - Simplified positions index format (@fulmicoton) #1022 +- Moved bitpacking to bitpacker subcrate and add BlockedBitpacker, which bitpacks blocks of 128 elements (@PSeitz) #1030 Tantivy 0.14.0 =========================