From face361fcbe0531da079a1a3b71f41fb85c73b54 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 10 Mar 2025 12:24:08 +0800 Subject: [PATCH] feat: introduce roaring bitmap to optimize sparse value scenarios (#5603) * feat: introduce roaring bitmap to optimize sparse value scenarios Signed-off-by: Zhenchi * fix taplo Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi * polish Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 11 + src/index/Cargo.toml | 1 + src/index/src/bitmap.rs | 868 ++++++++++++++++++ src/index/src/inverted_index/create.rs | 7 +- src/index/src/inverted_index/create/sort.rs | 9 +- .../create/sort/external_sort.rs | 78 +- .../create/sort/intermediate_rw.rs | 95 +- .../create/sort/intermediate_rw/codec_v1.rs | 85 +- .../create/sort/merge_stream.rs | 56 +- .../src/inverted_index/create/sort_create.rs | 46 +- src/index/src/inverted_index/error.rs | 9 + src/index/src/inverted_index/format/reader.rs | 28 +- .../src/inverted_index/format/reader/blob.rs | 76 +- src/index/src/inverted_index/format/writer.rs | 8 +- .../src/inverted_index/format/writer/blob.rs | 85 +- .../inverted_index/format/writer/single.rs | 77 +- .../search/fst_values_mapper.rs | 86 +- .../src/inverted_index/search/index_apply.rs | 6 +- .../search/index_apply/predicates_apply.rs | 65 +- src/index/src/lib.rs | 1 + src/mito2/src/cache/index/inverted_index.rs | 80 +- .../src/sst/index/inverted_index/applier.rs | 6 +- .../src/sst/index/inverted_index/creator.rs | 4 +- 23 files changed, 1395 insertions(+), 392 deletions(-) create mode 100644 src/index/src/bitmap.rs diff --git a/Cargo.lock b/Cargo.lock index cea1229542..b3b7b3b058 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5566,6 +5566,7 @@ dependencies = [ "rand", "regex", "regex-automata 0.4.8", + "roaring", "serde", "serde_json", "snafu 0.8.5", @@ -9632,6 +9633,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "roaring" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41589aba99537475bf697f2118357cad1c31590c5a1b9f6d9fc4ad6d07503661" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "robust" version = "1.1.0" diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index f149c76565..dc6e394ef4 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -29,6 +29,7 @@ prost.workspace = true puffin.workspace = true regex.workspace = true regex-automata.workspace = true +roaring = "0.10" serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/index/src/bitmap.rs b/src/index/src/bitmap.rs new file mode 100644 index 0000000000..0b707cc242 --- /dev/null +++ b/src/index/src/bitmap.rs @@ -0,0 +1,868 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::ops::RangeInclusive; + +use common_base::BitVec; +/// `BitmapType` enumerates how bitmaps are encoded within the inverted index. +pub use greptime_proto::v1::index::BitmapType; +use roaring::RoaringBitmap; + +/// A bitmap representation supporting both BitVec and RoaringBitmap formats. +/// +/// This enum provides unified bitmap operations while allowing efficient storage +/// in different formats. The implementation automatically handles type conversions +/// when performing operations between different formats. +/// +/// # Examples +/// +/// Creating a new Roaring bitmap: +/// ``` +/// use bitmap::Bitmap; +/// let bitmap = Bitmap::new_roaring(); +/// assert!(bitmap.is_empty()); +/// ``` +/// +/// Creating a full BitVec bitmap: +/// ``` +/// use bitmap::Bitmap; +/// let bitmap = Bitmap::full_bitvec(10); +/// assert_eq!(bitmap.count_ones(), 10); +/// ``` +#[derive(Debug, Clone, PartialEq)] +pub enum Bitmap { + Roaring(RoaringBitmap), + BitVec(BitVec), +} + +impl Bitmap { + /// Creates a new empty BitVec-based bitmap. + pub fn new_bitvec() -> Self { + Bitmap::BitVec(BitVec::EMPTY) + } + + /// Creates a new empty RoaringBitmap-based bitmap. + pub fn new_roaring() -> Self { + Bitmap::Roaring(RoaringBitmap::new()) + } + + /// Creates a full BitVec-based bitmap with all bits set to 1. + /// + /// # Arguments + /// * `size` - The number of bits to allocate and set + pub fn full_bitvec(size: usize) -> Self { + Bitmap::BitVec(BitVec::repeat(true, size)) + } + + /// Creates a full RoaringBitmap-based bitmap with bits 0..size set to 1. + /// + /// # Arguments + /// * `size` - The exclusive upper bound for the bit range + pub fn full_roaring(size: usize) -> Self { + let mut roaring = RoaringBitmap::new(); + roaring.insert_range(0..size as u32); + Bitmap::Roaring(roaring) + } + + /// Returns the number of bits set to 1 in the bitmap. + pub fn count_ones(&self) -> usize { + match self { + Bitmap::BitVec(bitvec) => bitvec.count_ones(), + Bitmap::Roaring(roaring) => roaring.len() as _, + } + } + + /// Checks if the bitmap contains no set bits. + pub fn is_empty(&self) -> bool { + match self { + Bitmap::BitVec(bitvec) => bitvec.is_empty(), + Bitmap::Roaring(roaring) => roaring.is_empty(), + } + } + + /// Inserts a range of bits into the bitmap. + /// + /// # Arguments + /// * `range` - Inclusive range of bits to set + pub fn insert_range(&mut self, range: RangeInclusive) { + match self { + Bitmap::BitVec(bitvec) => { + if *range.end() >= bitvec.len() { + bitvec.resize(range.end() + 1, false); + } + for i in range { + bitvec.set(i, true); + } + } + Bitmap::Roaring(roaring) => { + let range = *range.start() as u32..=*range.end() as u32; + roaring.insert_range(range); + } + } + } + + /// Serializes the bitmap into a byte buffer using the specified format. + /// + /// # Arguments + /// * `serialize_type` - Target format for serialization + /// * `writer` - Output writer to write the serialized data + pub fn serialize_into( + &self, + serialize_type: BitmapType, + mut writer: impl io::Write, + ) -> io::Result<()> { + match (self, serialize_type) { + (Bitmap::BitVec(bitvec), BitmapType::BitVec) => { + writer.write_all(bitvec.as_raw_slice())?; + } + (Bitmap::Roaring(roaring), BitmapType::Roaring) => { + roaring.serialize_into(writer)?; + } + (Bitmap::BitVec(bitvec), BitmapType::Roaring) => { + let bitmap = Bitmap::bitvec_to_roaring(bitvec.clone()); + bitmap.serialize_into(writer)?; + } + (Bitmap::Roaring(roaring), BitmapType::BitVec) => { + let bitvec = Bitmap::roaring_to_bitvec(roaring); + writer.write_all(bitvec.as_raw_slice())?; + } + } + + Ok(()) + } + + /// Computes the size of the serialized bitmap in bytes. + /// + /// # Arguments + /// * `bitmap_type` - Format of data to be serialized + pub fn serialized_size(&self, bitmap_type: BitmapType) -> usize { + match (self, bitmap_type) { + (Bitmap::BitVec(bitvec), BitmapType::BitVec) => bitvec.as_raw_slice().len(), + (Bitmap::Roaring(roaring), BitmapType::Roaring) => roaring.serialized_size(), + (Bitmap::BitVec(bitvec), BitmapType::Roaring) => { + let bitmap = Bitmap::bitvec_to_roaring(bitvec.clone()); + bitmap.serialized_size() + } + (Bitmap::Roaring(roaring), BitmapType::BitVec) => { + let bitvec = Bitmap::roaring_to_bitvec(roaring); + bitvec.as_raw_slice().len() + } + } + } + + /// Deserializes a bitmap from a byte buffer. + /// + /// # Arguments + /// * `buf` - Input buffer containing serialized data + /// * `bitmap_type` - Format of the serialized data + pub fn deserialize_from(buf: &[u8], bitmap_type: BitmapType) -> std::io::Result { + match bitmap_type { + BitmapType::BitVec => { + let bitvec = BitVec::from_slice(buf); + Ok(Bitmap::BitVec(bitvec)) + } + BitmapType::Roaring => { + let roaring = RoaringBitmap::deserialize_from(buf)?; + Ok(Bitmap::Roaring(roaring)) + } + } + } + + /// Computes the union with another bitmap (in-place). + /// + /// If the other bitmap is a different type, it will be converted to match + /// the current bitmap's type. + pub fn union(&mut self, other: Self) { + if self.is_empty() { + *self = other; + return; + } + + match (self, other) { + (Bitmap::BitVec(bitvec1), bitmap) => { + let bitvec2 = bitmap.into_bitvec(); + if bitvec1.len() > bitvec2.len() { + *bitvec1 |= bitvec2 + } else { + *bitvec1 = bitvec2 | &*bitvec1; + } + } + (Bitmap::Roaring(roaring1), bitmap) => { + let roaring2 = bitmap.into_roaring(); + *roaring1 |= roaring2; + } + } + } + + /// Computes the intersection with another bitmap (in-place). + /// + /// If the other bitmap is a different type, it will be converted to match + /// the current bitmap's type. + pub fn intersect(&mut self, other: Self) { + match (self, other) { + (Bitmap::BitVec(bitvec1), bitmap) => { + let mut bitvec2 = bitmap.into_bitvec(); + let len = (bitvec1.len() - bitvec1.trailing_zeros()) + .min(bitvec2.len() - bitvec2.trailing_zeros()); + bitvec1.truncate(len); + bitvec2.truncate(len); + *bitvec1 &= bitvec2; + } + (Bitmap::Roaring(roaring1), bitmap) => { + let roaring2 = bitmap.into_roaring(); + *roaring1 &= roaring2; + } + } + } + + /// Returns an iterator over the indices of set bits. + pub fn iter_ones(&self) -> Box + '_> { + match self { + Bitmap::BitVec(bitvec) => Box::new(bitvec.iter_ones()), + Bitmap::Roaring(roaring) => Box::new(roaring.iter().map(|x| x as usize)), + } + } + + /// Creates a bitmap from bytes in LSB0 (least significant bit first) order. + /// + /// # Arguments + /// * `bytes` - Input bytes in LSB0 order + /// * `bitmap_type` - Type of bitmap to create + pub fn from_lsb0_bytes(bytes: &[u8], bitmap_type: BitmapType) -> Self { + match bitmap_type { + BitmapType::BitVec => { + let bitvec = BitVec::from_slice(bytes); + Bitmap::BitVec(bitvec) + } + BitmapType::Roaring => { + let roaring = RoaringBitmap::from_lsb0_bytes(0, bytes); + Bitmap::Roaring(roaring) + } + } + } + + /// Computes memory usage of the bitmap in bytes. + pub fn memory_usage(&self) -> usize { + match self { + Bitmap::BitVec(bitvec) => bitvec.capacity(), + Bitmap::Roaring(roaring) => { + let stat = roaring.statistics(); + (stat.n_bytes_array_containers + + stat.n_bytes_bitset_containers + + stat.n_bytes_run_containers) as usize + } + } + } + + fn into_bitvec(self) -> BitVec { + match self { + Bitmap::BitVec(bitvec) => bitvec, + Bitmap::Roaring(roaring) => Self::roaring_to_bitvec(&roaring), + } + } + + fn into_roaring(self) -> RoaringBitmap { + match self { + Bitmap::Roaring(roaring) => roaring, + Bitmap::BitVec(bitvec) => Self::bitvec_to_roaring(bitvec), + } + } + + fn roaring_to_bitvec(roaring: &RoaringBitmap) -> BitVec { + let max_value = roaring.max().unwrap_or(0); + let mut bitvec = BitVec::repeat(false, max_value as usize + 1); + for i in roaring { + bitvec.set(i as usize, true); + } + bitvec + } + + fn bitvec_to_roaring(mut bitvec: BitVec) -> RoaringBitmap { + bitvec.resize(bitvec.capacity(), false); + RoaringBitmap::from_lsb0_bytes(0, bitvec.as_raw_slice()) + } +} + +impl Default for Bitmap { + fn default() -> Self { + Bitmap::new_roaring() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_full_bitmaps() { + let bv = Bitmap::full_bitvec(10); + assert_eq!(bv.count_ones(), 10); + + let rb = Bitmap::full_roaring(10); + assert_eq!(rb.count_ones(), 10); + } + + #[test] + fn test_serialization_roundtrip() { + let original = Bitmap::full_roaring(100); + let mut buf = Vec::new(); + + // Serialize as Roaring + original + .serialize_into(BitmapType::Roaring, &mut buf) + .unwrap(); + let deserialized = Bitmap::deserialize_from(&buf, BitmapType::Roaring).unwrap(); + assert_eq!(original, deserialized); + + // Serialize as BitVec + buf.clear(); + original + .serialize_into(BitmapType::BitVec, &mut buf) + .unwrap(); + let deserialized = Bitmap::deserialize_from(&buf, BitmapType::BitVec).unwrap(); + assert_eq!(original.count_ones(), deserialized.count_ones()); + } + + #[test] + fn test_union_fulls() { + // Test BitVec union + let mut bv1 = Bitmap::full_bitvec(3); // 0-2: 111 + let bv2 = Bitmap::full_bitvec(5); // 0-4: 11111 + bv1.union(bv2); + assert_eq!(bv1.count_ones(), 5); + + let mut bv1 = Bitmap::full_bitvec(5); // 0-4: 11111 + let bv2 = Bitmap::full_bitvec(3); // 0-2: 111 + bv1.union(bv2); + assert_eq!(bv1.count_ones(), 5); + + // Test Roaring union + let mut rb1 = Bitmap::full_roaring(3); // 0-2: 111 + let rb2 = Bitmap::full_roaring(5); // 0-4: 11111 + rb1.union(rb2); + assert_eq!(rb1.count_ones(), 5); + + let mut rb1 = Bitmap::full_roaring(5); // 0-4: 11111 + let rb2 = Bitmap::full_roaring(3); // 0-2: 111 + rb1.union(rb2); + assert_eq!(rb1.count_ones(), 5); + + // Test cross-type union + let mut rb = Bitmap::full_roaring(5); // 0-4: 11111 + let bv = Bitmap::full_bitvec(3); // 0-2: 111 + rb.union(bv); + assert_eq!(rb.count_ones(), 5); + + let mut bv = Bitmap::full_bitvec(5); // 0-4: 11111 + let rb = Bitmap::full_roaring(3); // 0-2: 111 + bv.union(rb); + assert_eq!(bv.count_ones(), 5); + + let mut rb = Bitmap::full_roaring(3); // 0-2: 111 + let bv = Bitmap::full_bitvec(5); // 0-4: 11111 + rb.union(bv); + assert_eq!(rb.count_ones(), 5); + + let mut bv = Bitmap::full_bitvec(3); // 0-2: 111 + let rb = Bitmap::full_roaring(5); // 0-4: 11111 + bv.union(rb); + assert_eq!(bv.count_ones(), 5); + } + + #[test] + fn test_union_bitvec() { + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + bv1.union(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::BitVec) + ); + + // Test different lengths + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101, 0b00000001], BitmapType::BitVec); + bv1.union(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::BitVec) + ); + + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + bv1.union(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::BitVec) + ); + + // Test empty bitmaps + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::new_bitvec(); + bv1.union(bv2); + assert!(bv1.is_empty()); + + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + bv1.union(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec) + ); + + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + let bv2 = Bitmap::new_bitvec(); + bv1.union(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec) + ); + + // Test empty and full bitmaps + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::full_bitvec(8); + bv1.union(bv2); + assert_eq!(bv1, Bitmap::full_bitvec(8)); + + let mut bv1 = Bitmap::full_bitvec(8); + let bv2 = Bitmap::new_bitvec(); + bv1.union(bv2); + assert_eq!(bv1, Bitmap::full_bitvec(8)); + } + + #[test] + fn test_union_roaring() { + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + rb1.union(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring) + ); + + // Test different lengths + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101, 0b00000001], BitmapType::Roaring); + rb1.union(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::Roaring) + ); + + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + rb1.union(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::Roaring) + ); + + // Test empty bitmaps + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::new_roaring(); + rb1.union(rb2); + assert!(rb1.is_empty()); + + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + rb1.union(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); + + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + let rb2 = Bitmap::new_roaring(); + rb1.union(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); + + // Test empty and full bit + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::full_roaring(8); + rb1.union(rb2); + assert_eq!(rb1, Bitmap::full_roaring(8)); + + let mut rb1 = Bitmap::full_roaring(8); + let rb2 = Bitmap::new_roaring(); + rb1.union(rb2); + assert_eq!(rb1, Bitmap::full_roaring(8)); + } + + #[test] + fn test_union_mixed() { + let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + rb.union(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring) + ); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + let rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + bv.union(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::BitVec) + ); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let bv = Bitmap::full_bitvec(8); + rb.union(bv); + assert_eq!(rb, Bitmap::full_roaring(8)); + + let mut bv = Bitmap::full_bitvec(8); + let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + bv.union(rb); + assert_eq!(bv, Bitmap::full_bitvec(8)); + + let mut rb = Bitmap::new_roaring(); + let bv = Bitmap::full_bitvec(8); + rb.union(bv); + assert_eq!(rb, Bitmap::full_bitvec(8)); + + let mut bv = Bitmap::full_bitvec(8); + let rb = Bitmap::new_roaring(); + bv.union(rb); + assert_eq!(bv, Bitmap::full_bitvec(8)); + + let mut rb = Bitmap::new_roaring(); + let bv = Bitmap::new_bitvec(); + rb.union(bv); + assert!(rb.is_empty()); + + let mut bv = Bitmap::new_bitvec(); + let rb = Bitmap::new_roaring(); + bv.union(rb); + assert!(bv.is_empty()); + + let mut rb = Bitmap::new_roaring(); + let bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + rb.union(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec) + ); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec); + let rb = Bitmap::new_roaring(); + bv.union(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec) + ); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + let bv = Bitmap::new_bitvec(); + rb.union(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); + + let mut bv = Bitmap::new_bitvec(); + let rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring); + bv.union(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); + } + + #[test] + fn test_intersect_fulls() { + // Test BitVec intersect + let mut bv1 = Bitmap::full_bitvec(3); // 0-2: 111 + let bv2 = Bitmap::full_bitvec(5); // 0-4: 11111 + bv1.intersect(bv2); + assert_eq!(bv1.count_ones(), 3); + + let mut bv1 = Bitmap::full_bitvec(5); // 0-4: 11111 + let bv2 = Bitmap::full_bitvec(3); // 0-2: 111 + bv1.intersect(bv2); + assert_eq!(bv1.count_ones(), 3); + + // Test Roaring intersect + let mut rb1 = Bitmap::full_roaring(3); // 0-2: 111 + let rb2 = Bitmap::full_roaring(5); // 0-4: 11111 + rb1.intersect(rb2); + assert_eq!(rb1.count_ones(), 3); + + let mut rb1 = Bitmap::full_roaring(5); // 0-4: 11111 + let rb2 = Bitmap::full_roaring(3); // 0-2: 111 + rb1.intersect(rb2); + assert_eq!(rb1.count_ones(), 3); + + // Test cross-type intersect + let mut rb = Bitmap::full_roaring(5); // 0-4: 11111 + let bv = Bitmap::full_bitvec(3); // 0-2: 111 + rb.intersect(bv); + assert_eq!(rb.count_ones(), 3); + + let mut bv = Bitmap::full_bitvec(5); // 0-4: 11111 + let rb = Bitmap::full_roaring(3); // 0-2: 111 + bv.intersect(rb); + assert_eq!(bv.count_ones(), 3); + + let mut rb = Bitmap::full_roaring(3); // 0-2: 111 + let bv = Bitmap::full_bitvec(5); // 0-4: 11111 + rb.intersect(bv); + assert_eq!(rb.count_ones(), 3); + + let mut bv = Bitmap::full_bitvec(3); // 0-2: 111 + let rb = Bitmap::full_roaring(5); // 0-4: 11111 + bv.intersect(rb); + assert_eq!(bv.count_ones(), 3); + } + + #[test] + fn test_intersect_bitvec() { + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + bv1.intersect(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + // Test different lengths + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec); + bv1.intersect(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::BitVec); + let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + bv1.intersect(bv2); + assert_eq!( + bv1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + // Test empty bitmaps + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::new_bitvec(); + bv1.intersect(bv2); + assert!(bv1.is_empty()); + + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + bv1.intersect(bv2); + assert!(bv1.is_empty()); + + let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + let bv2 = Bitmap::new_bitvec(); + bv1.intersect(bv2); + assert!(bv1.is_empty()); + + // Test empty and full bitmaps + let mut bv1 = Bitmap::new_bitvec(); + let bv2 = Bitmap::full_bitvec(8); + bv1.intersect(bv2); + assert!(bv1.is_empty()); + + let mut bv1 = Bitmap::full_bitvec(8); + let bv2 = Bitmap::new_bitvec(); + bv1.intersect(bv2); + assert!(bv1.is_empty()); + } + + #[test] + fn test_intersect_roaring() { + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + rb1.intersect(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + // Test different lengths + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring); + rb1.intersect(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::Roaring); + let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + rb1.intersect(rb2); + assert_eq!( + rb1, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + // Test empty bitmaps + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::new_roaring(); + rb1.intersect(rb2); + assert!(rb1.is_empty()); + + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + rb1.intersect(rb2); + assert!(rb1.is_empty()); + + let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let rb2 = Bitmap::new_roaring(); + rb1.intersect(rb2); + assert!(rb1.is_empty()); + + // Test empty and full bitmaps + let mut rb1 = Bitmap::new_roaring(); + let rb2 = Bitmap::full_roaring(8); + rb1.intersect(rb2); + assert!(rb1.is_empty()); + + let mut rb1 = Bitmap::full_roaring(8); + let rb2 = Bitmap::new_roaring(); + rb1.intersect(rb2); + assert!(rb1.is_empty()); + } + + #[test] + fn test_intersect_mixed() { + let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + rb.intersect(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec); + let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + bv.intersect(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + let bv = Bitmap::full_bitvec(8); + rb.intersect(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring) + ); + + let mut bv = Bitmap::full_bitvec(8); + let rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + bv.intersect(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec) + ); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring); + let bv = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec); + rb.intersect(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::BitVec); + let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + bv.intersect(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::Roaring); + let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + rb.intersect(bv); + assert_eq!( + rb, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring) + ); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec); + let rb = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring); + bv.intersect(rb); + assert_eq!( + bv, + Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec) + ); + + let mut rb = Bitmap::new_roaring(); + let bv = Bitmap::full_bitvec(8); + rb.intersect(bv); + assert!(rb.is_empty()); + + let mut bv = Bitmap::full_bitvec(8); + let rb = Bitmap::new_roaring(); + bv.intersect(rb); + assert!(bv.is_empty()); + + let mut bv = Bitmap::new_bitvec(); + let rb = Bitmap::full_roaring(8); + bv.intersect(rb); + assert!(bv.is_empty()); + + let mut rb = Bitmap::full_roaring(8); + let bv = Bitmap::new_bitvec(); + rb.intersect(bv); + assert!(rb.is_empty()); + + let mut rb = Bitmap::new_roaring(); + let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + rb.intersect(bv); + assert!(rb.is_empty()); + + let mut bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec); + let rb = Bitmap::new_roaring(); + bv.intersect(rb); + assert!(bv.is_empty()); + + let mut bv = Bitmap::new_bitvec(); + let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + bv.intersect(rb); + assert!(bv.is_empty()); + + let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring); + let bv = Bitmap::new_bitvec(); + rb.intersect(bv); + assert!(rb.is_empty()); + } + + #[test] + fn test_insert_range() { + let mut bv = Bitmap::new_bitvec(); + bv.insert_range(0..=5); + assert_eq!(bv.iter_ones().collect::>(), vec![0, 1, 2, 3, 4, 5]); + + let mut rb = Bitmap::new_roaring(); + rb.insert_range(0..=5); + assert_eq!(bv.iter_ones().collect::>(), vec![0, 1, 2, 3, 4, 5]); + + let mut bv = Bitmap::new_bitvec(); + bv.insert_range(10..=10); + assert_eq!(bv.iter_ones().collect::>(), vec![10]); + + let mut rb = Bitmap::new_roaring(); + rb.insert_range(10..=10); + assert_eq!(bv.iter_ones().collect::>(), vec![10]); + } +} diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index b56d09dc99..a0468fdcba 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -17,6 +17,7 @@ pub mod sort_create; use async_trait::async_trait; +use crate::bitmap::BitmapType; use crate::inverted_index::error::Result; use crate::inverted_index::format::writer::InvertedIndexWriter; use crate::BytesRef; @@ -53,5 +54,9 @@ pub trait InvertedIndexCreator: Send { /// Finalizes the index creation process, ensuring all data is properly indexed and stored /// in the provided writer - async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()>; + async fn finish( + &mut self, + writer: &mut dyn InvertedIndexWriter, + bitmap_type: BitmapType, + ) -> Result<()>; } diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index cb92bfa1ad..bdc1ec21f6 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -17,22 +17,23 @@ mod intermediate_rw; mod merge_stream; use async_trait::async_trait; -use common_base::BitVec; use futures::Stream; +use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; +use crate::inverted_index::format::writer::ValueStream; use crate::{Bytes, BytesRef}; /// A stream of sorted values along with their associated bitmap -pub type SortedStream = Box> + Send + Unpin>; +pub type SortedStream = Box> + Send + Unpin>; /// Output of a sorting operation, encapsulating a bitmap for null values and a stream of sorted items pub struct SortOutput { /// Bitmap indicating which segments have null values - pub segment_null_bitmap: BitVec, + pub segment_null_bitmap: Bitmap, /// Stream of sorted items - pub sorted_stream: SortedStream, + pub sorted_stream: ValueStream, /// Total number of rows in the sorted data pub total_row_count: usize, diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index cdd6e848c9..13e480d1c5 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -20,11 +20,11 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use async_trait::async_trait; -use common_base::BitVec; use common_telemetry::{debug, error}; use futures::stream; use snafu::ResultExt; +use crate::bitmap::Bitmap; use crate::external_provider::ExternalTempFileProvider; use crate::inverted_index::create::sort::intermediate_rw::{ IntermediateReader, IntermediateWriter, @@ -45,18 +45,10 @@ pub struct ExternalSorter { temp_file_provider: Arc, /// Bitmap indicating which segments have null values - segment_null_bitmap: BitVec, + segment_null_bitmap: Bitmap, /// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded - values_buffer: BTreeMap, - - /// Count of rows in the last dumped buffer, used to streamline memory usage of `values_buffer`. - /// - /// After data is dumped to external files, `last_dump_row_count` is updated to reflect the new starting point - /// for `BitVec` indexing. This means each `BitVec` in `values_buffer` thereafter encodes positions relative to - /// this count, not from 0. This mechanism effectively shrinks the memory footprint of each `BitVec`, helping manage - /// memory use more efficiently by focusing only on newly ingested data post-dump. - last_dump_row_count: usize, + values_buffer: BTreeMap, /// Count of all rows ingested so far total_row_count: usize, @@ -93,14 +85,14 @@ impl Sorter for ExternalSorter { return Ok(()); } - let segment_index_range = self.segment_index_range(n, value.is_none()); + let segment_index_range = self.segment_index_range(n); self.total_row_count += n; if let Some(value) = value { let memory_diff = self.push_not_null(value, segment_index_range); self.may_dump_buffer(memory_diff).await } else { - set_bits(&mut self.segment_null_bitmap, segment_index_range); + self.segment_null_bitmap.insert_range(segment_index_range); Ok(()) } } @@ -117,15 +109,10 @@ impl Sorter for ExternalSorter { // TODO(zhongzc): k-way merge instead of 2-way merge let mut tree_nodes: VecDeque = VecDeque::with_capacity(readers.len() + 1); - let leading_zeros = self.last_dump_row_count / self.segment_row_count; tree_nodes.push_back(Box::new(stream::iter( mem::take(&mut self.values_buffer) .into_iter() - .map(move |(value, mut bitmap)| { - bitmap.resize(bitmap.len() + leading_zeros, false); - bitmap.shift_right(leading_zeros); - Ok((value, bitmap)) - }), + .map(|(value, (bitmap, _))| Ok((value, bitmap))), ))); for (_, reader) in readers { tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?); @@ -161,11 +148,10 @@ impl ExternalSorter { index_name, temp_file_provider, - segment_null_bitmap: BitVec::new(), + segment_null_bitmap: Bitmap::new_bitvec(), // bitvec is more efficient for many null values values_buffer: BTreeMap::new(), total_row_count: 0, - last_dump_row_count: 0, segment_row_count, current_memory_usage: 0, @@ -195,7 +181,7 @@ impl ExternalSorter { } /// Pushes the non-null values to the values buffer and sets the bits within - /// the specified range in the given BitVec to true. + /// the specified range in the given bitmap to true. /// Returns the memory usage difference of the buffer after the operation. fn push_not_null( &mut self, @@ -203,20 +189,23 @@ impl ExternalSorter { segment_index_range: RangeInclusive, ) -> usize { match self.values_buffer.get_mut(value) { - Some(bitmap) => { - let old_len = bitmap.as_raw_slice().len(); - set_bits(bitmap, segment_index_range); + Some((bitmap, mem_usage)) => { + bitmap.insert_range(segment_index_range); + let new_usage = bitmap.memory_usage() + value.len(); + let diff = new_usage - *mem_usage; + *mem_usage = new_usage; - bitmap.as_raw_slice().len() - old_len + diff } None => { - let mut bitmap = BitVec::default(); - set_bits(&mut bitmap, segment_index_range); + let mut bitmap = Bitmap::new_roaring(); + bitmap.insert_range(segment_index_range); - let mem_diff = bitmap.as_raw_slice().len() + value.len(); - self.values_buffer.insert(value.to_vec(), bitmap); + let mem_usage = bitmap.memory_usage() + value.len(); + self.values_buffer + .insert(value.to_vec(), (bitmap, mem_usage)); - mem_diff + mem_usage } } } @@ -257,12 +246,8 @@ impl ExternalSorter { .fetch_sub(memory_usage, Ordering::Relaxed); self.current_memory_usage = 0; - let bitmap_leading_zeros = self.last_dump_row_count / self.segment_row_count; - self.last_dump_row_count = - self.total_row_count - self.total_row_count % self.segment_row_count; // align to segment - let entries = values.len(); - IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_| + IntermediateWriter::new(writer).write_all(values.into_iter().map(|(k, (b, _))| (k, b))).await.inspect(|_| debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}") ).inspect_err(|e| error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}") @@ -271,13 +256,8 @@ impl ExternalSorter { /// Determines the segment index range for the row index range /// `[row_begin, row_begin + n - 1]` - fn segment_index_range(&self, n: usize, is_null: bool) -> RangeInclusive { - let row_begin = if is_null { - self.total_row_count - } else { - self.total_row_count - self.last_dump_row_count - }; - + fn segment_index_range(&self, n: usize) -> RangeInclusive { + let row_begin = self.total_row_count; let start = self.segment_index(row_begin); let end = self.segment_index(row_begin + n - 1); start..=end @@ -289,16 +269,6 @@ impl ExternalSorter { } } -/// Sets the bits within the specified range in the given `BitVec` to true -fn set_bits(bitmap: &mut BitVec, index_range: RangeInclusive) { - if *index_range.end() >= bitmap.len() { - bitmap.resize(index_range.end() + 1, false); - } - for index in index_range { - bitmap.set(index, true); - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; @@ -330,7 +300,7 @@ mod tests { move |index_name, file_id| { assert_eq!(index_name, "test"); let mut files = files.lock().unwrap(); - let (writer, reader) = duplex(8 * 1024); + let (writer, reader) = duplex(1024 * 1024); files.insert(file_id.to_string(), Box::new(reader.compat())); Ok(Box::new(writer.compat_write())) } diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index 85fc76e951..94a743c67c 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -19,29 +19,24 @@ //! The serialization format is as follows: //! //! ```text -//! [magic][bitmap leading zeros][item][item]...[item] -//! [4] [4] [?] +//! [magic][item][item]...[item] +//! [4] [?] //! //! Each [item] is structured as: //! [value len][value][bitmap len][bitmap] //! [8] [?] [8] [?] //! ``` //! -//! The format starts with a 4-byte magic identifier, followed by a 4-byte -//! bitmap leading zeros count, indicating how many leading zeros are in the -//! fixed-size region of the bitmap. Following that, each item represents -//! a value and its associated bitmap, serialized with their lengths for +//! Each item represents a value and its associated bitmap, serialized with their lengths for //! easier deserialization. mod codec_v1; -use std::collections::BTreeMap; - use asynchronous_codec::{FramedRead, FramedWrite}; -use common_base::BitVec; use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; use snafu::ResultExt; +use crate::bitmap::{Bitmap, BitmapType}; use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::{ CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu, @@ -62,12 +57,13 @@ impl IntermediateWriter { /// Serializes and writes all provided values to the wrapped writer pub async fn write_all( mut self, - values: BTreeMap, - bitmap_leading_zeros: u32, + values: impl IntoIterator, ) -> Result<()> { let (codec_magic, encoder) = ( codec_v1::CODEC_V1_MAGIC, - codec_v1::IntermediateItemEncoderV1, + codec_v1::IntermediateItemEncoderV1 { + bitmap_type: BitmapType::Roaring, + }, ); self.writer @@ -75,11 +71,6 @@ impl IntermediateWriter { .await .context(WriteSnafu)?; - self.writer - .write_all(&bitmap_leading_zeros.to_be_bytes()) - .await - .context(WriteSnafu)?; - let value_stream = stream::iter(values.into_iter().map(Ok)); let frame_write = FramedWrite::new(&mut self.writer, encoder); // `forward()` will flush and close the writer when the stream ends @@ -112,17 +103,9 @@ impl IntermediateReader { .context(ReadSnafu)?; let decoder = match &magic { - codec_v1::CODEC_V1_MAGIC => { - let bitmap_leading_zeros = { - let mut buf = [0u8; 4]; - self.reader.read_exact(&mut buf).await.context(ReadSnafu)?; - u32::from_be_bytes(buf) - }; - - codec_v1::IntermediateItemDecoderV1 { - bitmap_leading_zeros, - } - } + codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateItemDecoderV1 { + bitmap_type: BitmapType::Roaring, + }, _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(), }; @@ -132,6 +115,7 @@ impl IntermediateReader { #[cfg(test)] mod tests { + use std::collections::BTreeMap; use std::io::{Seek, SeekFrom}; use futures::io::{AllowStdIo, Cursor}; @@ -140,6 +124,10 @@ mod tests { use super::*; use crate::inverted_index::error::Error; + fn bitmap(bytes: &[u8]) -> Bitmap { + Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring) + } + #[tokio::test] async fn test_intermediate_read_write_basic() { let file_r = tempfile().unwrap(); @@ -148,12 +136,12 @@ mod tests { let buf_w = AllowStdIo::new(file_w); let values = BTreeMap::from_iter([ - (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), - (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), + (Bytes::from("a"), bitmap(&[0b10101010])), + (Bytes::from("b"), bitmap(&[0b01010101])), ]); let writer = IntermediateWriter::new(buf_w); - writer.write_all(values.clone(), 0).await.unwrap(); + writer.write_all(values.clone()).await.unwrap(); // reset the handle buf_r.seek(SeekFrom::Start(0)).unwrap(); @@ -161,48 +149,9 @@ mod tests { let mut stream = reader.into_stream().await.unwrap(); let a = stream.next().await.unwrap().unwrap(); - assert_eq!(a, (Bytes::from("a"), BitVec::from_slice(&[0b10101010]))); + assert_eq!(a, (Bytes::from("a"), bitmap(&[0b10101010]))); let b = stream.next().await.unwrap().unwrap(); - assert_eq!(b, (Bytes::from("b"), BitVec::from_slice(&[0b01010101]))); - assert!(stream.next().await.is_none()); - } - - #[tokio::test] - async fn test_intermediate_read_write_with_prefix_zeros() { - let file_r = tempfile().unwrap(); - let file_w = file_r.try_clone().unwrap(); - let mut buf_r = AllowStdIo::new(file_r); - let buf_w = AllowStdIo::new(file_w); - - let values = BTreeMap::from_iter([ - (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), - (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), - ]); - - let writer = IntermediateWriter::new(buf_w); - writer.write_all(values.clone(), 8).await.unwrap(); - // reset the handle - buf_r.seek(SeekFrom::Start(0)).unwrap(); - - let reader = IntermediateReader::new(buf_r); - let mut stream = reader.into_stream().await.unwrap(); - - let a = stream.next().await.unwrap().unwrap(); - assert_eq!( - a, - ( - Bytes::from("a"), - BitVec::from_slice(&[0b00000000, 0b10101010]) - ) - ); - let b = stream.next().await.unwrap().unwrap(); - assert_eq!( - b, - ( - Bytes::from("b"), - BitVec::from_slice(&[0b00000000, 0b01010101]) - ) - ); + assert_eq!(b, (Bytes::from("b"), bitmap(&[0b01010101]))); assert!(stream.next().await.is_none()); } @@ -213,7 +162,7 @@ mod tests { let values = BTreeMap::new(); let writer = IntermediateWriter::new(&mut buf); - writer.write_all(values.clone(), 0).await.unwrap(); + writer.write_all(values.clone()).await.unwrap(); let reader = IntermediateReader::new(Cursor::new(buf)); let mut stream = reader.into_stream().await.unwrap(); diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index 05a4eeb57d..c8ca7c4332 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -16,9 +16,10 @@ use std::io; use asynchronous_codec::{BytesMut, Decoder, Encoder}; use bytes::{Buf, BufMut}; -use common_base::BitVec; +use greptime_proto::v1::index::BitmapType; use snafu::ResultExt; +use crate::bitmap::Bitmap; use crate::inverted_index::error::{CommonIoSnafu, Error, Result}; use crate::Bytes; @@ -28,37 +29,42 @@ const U64_LENGTH: usize = std::mem::size_of::(); pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01"; /// Serializes items of external sorting intermediate files. -pub struct IntermediateItemEncoderV1; +pub struct IntermediateItemEncoderV1 { + pub bitmap_type: BitmapType, +} /// [`FramedWrite`] requires the [`Encoder`] trait to be implemented. impl Encoder for IntermediateItemEncoderV1 { - type Item<'a> = (Bytes, BitVec); + type Item<'a> = (Bytes, Bitmap); type Error = Error; - fn encode(&mut self, item: (Bytes, BitVec), dst: &mut BytesMut) -> Result<()> { + fn encode(&mut self, item: (Bytes, Bitmap), dst: &mut BytesMut) -> Result<()> { let value_bytes = item.0; - let bitmap_bytes = item.1.into_vec(); + let bitmap_size = item.1.serialized_size(self.bitmap_type); - dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_bytes.len()); + dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_size); dst.put_u64_le(value_bytes.len() as u64); dst.extend_from_slice(&value_bytes); - dst.put_u64_le(bitmap_bytes.len() as u64); - dst.extend_from_slice(&bitmap_bytes); + dst.put_u64_le(bitmap_size as u64); + item.1 + .serialize_into(self.bitmap_type, &mut dst.writer()) + .context(CommonIoSnafu)?; + Ok(()) } } /// Deserializes items of external sorting intermediate files. pub struct IntermediateItemDecoderV1 { - pub(crate) bitmap_leading_zeros: u32, + pub bitmap_type: BitmapType, } /// [`FramedRead`] requires the [`Decoder`] trait to be implemented. impl Decoder for IntermediateItemDecoderV1 { - type Item = (Bytes, BitVec); + type Item = (Bytes, Bitmap); type Error = Error; - /// Decodes the `src` into `(Bytes, BitVec)`. Returns `None` if + /// Decodes the `src` into `(Bytes, RoaringBitmap)`. Returns `None` if /// the `src` does not contain enough data for a complete item. /// /// Only after successful decoding, the `src` is advanced. Otherwise, @@ -92,8 +98,8 @@ impl Decoder for IntermediateItemDecoderV1 { return Ok(None); } - let mut bitmap = BitVec::repeat(false, self.bitmap_leading_zeros as _); - bitmap.extend_from_raw_slice(&buf[..bitmap_len]); + let bitmap = Bitmap::deserialize_from(&buf[..bitmap_len], self.bitmap_type) + .context(CommonIoSnafu)?; let item = (value_bytes.to_vec(), bitmap); @@ -113,25 +119,29 @@ impl From for Error { #[cfg(test)] mod tests { - use common_base::bit_vec::prelude::{bitvec, Lsb0}; - use super::*; + fn bitmap(bytes: &[u8]) -> Bitmap { + Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring) + } + #[test] fn test_intermediate_codec_basic() { - let mut encoder = IntermediateItemEncoderV1; + let mut encoder = IntermediateItemEncoderV1 { + bitmap_type: BitmapType::Roaring, + }; let mut buf = BytesMut::new(); - let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + let item = (b"hello".to_vec(), bitmap(&[0b10101010])); encoder.encode(item.clone(), &mut buf).unwrap(); let mut decoder = IntermediateItemDecoderV1 { - bitmap_leading_zeros: 0, + bitmap_type: BitmapType::Roaring, }; assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); assert_eq!(decoder.decode(&mut buf).unwrap(), None); - let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101])); + let item1 = (b"world".to_vec(), bitmap(&[0b01010101])); encoder.encode(item.clone(), &mut buf).unwrap(); encoder.encode(item1.clone(), &mut buf).unwrap(); assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); @@ -142,14 +152,16 @@ mod tests { #[test] fn test_intermediate_codec_empty_item() { - let mut encoder = IntermediateItemEncoderV1; + let mut encoder = IntermediateItemEncoderV1 { + bitmap_type: BitmapType::Roaring, + }; let mut buf = BytesMut::new(); - let item = (b"".to_vec(), BitVec::from_slice(&[])); + let item = (b"".to_vec(), bitmap(&[])); encoder.encode(item.clone(), &mut buf).unwrap(); let mut decoder = IntermediateItemDecoderV1 { - bitmap_leading_zeros: 0, + bitmap_type: BitmapType::Roaring, }; assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); assert_eq!(decoder.decode(&mut buf).unwrap(), None); @@ -158,17 +170,19 @@ mod tests { #[test] fn test_intermediate_codec_partial() { - let mut encoder = IntermediateItemEncoderV1; + let mut encoder = IntermediateItemEncoderV1 { + bitmap_type: BitmapType::Roaring, + }; let mut buf = BytesMut::new(); - let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + let item = (b"hello".to_vec(), bitmap(&[0b10101010])); encoder.encode(item.clone(), &mut buf).unwrap(); let partial_length = U64_LENGTH + 3; let mut partial_bytes = buf.split_to(partial_length); let mut decoder = IntermediateItemDecoderV1 { - bitmap_leading_zeros: 0, + bitmap_type: BitmapType::Roaring, }; assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); // not enough data partial_bytes.extend_from_slice(&buf[..]); @@ -176,25 +190,4 @@ mod tests { assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); assert!(partial_bytes.is_empty()); } - - #[test] - fn test_intermediate_codec_prefix_zeros() { - let mut encoder = IntermediateItemEncoderV1; - let mut buf = BytesMut::new(); - - let item = (b"hello".to_vec(), bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]); - encoder.encode(item.clone(), &mut buf).unwrap(); - - let mut decoder = IntermediateItemDecoderV1 { - bitmap_leading_zeros: 3, - }; - let decoded_item = decoder.decode(&mut buf).unwrap().unwrap(); - assert_eq!(decoded_item.0, b"hello"); - assert_eq!( - decoded_item.1, - bitvec![u8, Lsb0; 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0] - ); - assert_eq!(decoder.decode(&mut buf).unwrap(), None); - assert!(buf.is_empty()); - } } diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs index 0e60f7d8af..dbe95e2db5 100644 --- a/src/index/src/inverted_index/create/sort/merge_stream.rs +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -16,10 +16,10 @@ use std::cmp::Ordering; use std::pin::Pin; use std::task::{Context, Poll}; -use common_base::BitVec; use futures::{ready, Stream, StreamExt}; use pin_project::pin_project; +use crate::bitmap::Bitmap; use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::Result; use crate::Bytes; @@ -28,10 +28,10 @@ use crate::Bytes; #[pin_project] pub struct MergeSortedStream { stream1: Option, - peek1: Option<(Bytes, BitVec)>, + peek1: Option<(Bytes, Bitmap)>, stream2: Option, - peek2: Option<(Bytes, BitVec)>, + peek2: Option<(Bytes, Bitmap)>, } impl MergeSortedStream { @@ -49,7 +49,7 @@ impl MergeSortedStream { } impl Stream for MergeSortedStream { - type Item = Result<(Bytes, BitVec)>; + type Item = Result<(Bytes, Bitmap)>; /// Polls both streams and returns the next item from the stream that has the smaller next item. /// If both streams have the same next item, the bitmaps are unioned together. @@ -89,77 +89,77 @@ impl Stream for MergeSortedStream { } /// Merges two bitmaps by bit-wise OR'ing them together, preserving all bits from both -fn merge_bitmaps(bitmap1: BitVec, bitmap2: BitVec) -> BitVec { - // make sure longer bitmap is on the left to avoid truncation - #[allow(clippy::if_same_then_else)] - if bitmap1.len() > bitmap2.len() { - bitmap1 | bitmap2 - } else { - bitmap2 | bitmap1 - } +fn merge_bitmaps(mut bitmap1: Bitmap, bitmap2: Bitmap) -> Bitmap { + bitmap1.union(bitmap2); + bitmap1 } #[cfg(test)] mod tests { use futures::stream; + use greptime_proto::v1::index::BitmapType; use super::*; use crate::inverted_index::error::Error; - fn sorted_stream_from_vec(vec: Vec<(Bytes, BitVec)>) -> SortedStream { + fn bitmap(bytes: &[u8]) -> Bitmap { + Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring) + } + + fn sorted_stream_from_vec(vec: Vec<(Bytes, Bitmap)>) -> SortedStream { Box::new(stream::iter(vec.into_iter().map(Ok::<_, Error>))) } #[tokio::test] async fn test_merge_sorted_stream_non_overlapping() { let stream1 = sorted_stream_from_vec(vec![ - (Bytes::from("apple"), BitVec::from_slice(&[0b10101010])), - (Bytes::from("orange"), BitVec::from_slice(&[0b01010101])), + (Bytes::from("apple"), bitmap(&[0b10101010])), + (Bytes::from("orange"), bitmap(&[0b01010101])), ]); let stream2 = sorted_stream_from_vec(vec![ - (Bytes::from("banana"), BitVec::from_slice(&[0b10101010])), - (Bytes::from("peach"), BitVec::from_slice(&[0b01010101])), + (Bytes::from("banana"), bitmap(&[0b10101010])), + (Bytes::from("peach"), bitmap(&[0b01010101])), ]); let mut merged_stream = MergeSortedStream::merge(stream1, stream2); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("apple")); - assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + assert_eq!(item.1, bitmap(&[0b10101010])); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("banana")); - assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + assert_eq!(item.1, bitmap(&[0b10101010])); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("orange")); - assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + assert_eq!(item.1, bitmap(&[0b01010101])); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("peach")); - assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + assert_eq!(item.1, bitmap(&[0b01010101])); assert!(merged_stream.next().await.is_none()); } #[tokio::test] async fn test_merge_sorted_stream_overlapping() { let stream1 = sorted_stream_from_vec(vec![ - (Bytes::from("apple"), BitVec::from_slice(&[0b10101010])), - (Bytes::from("orange"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("apple"), bitmap(&[0b10101010])), + (Bytes::from("orange"), bitmap(&[0b10101010])), ]); let stream2 = sorted_stream_from_vec(vec![ - (Bytes::from("apple"), BitVec::from_slice(&[0b01010101])), - (Bytes::from("peach"), BitVec::from_slice(&[0b01010101])), + (Bytes::from("apple"), bitmap(&[0b01010101])), + (Bytes::from("peach"), bitmap(&[0b01010101])), ]); let mut merged_stream = MergeSortedStream::merge(stream1, stream2); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("apple")); - assert_eq!(item.1, BitVec::from_slice(&[0b11111111])); + assert_eq!(item.1, bitmap(&[0b11111111])); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("orange")); - assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + assert_eq!(item.1, bitmap(&[0b10101010])); let item = merged_stream.next().await.unwrap().unwrap(); assert_eq!(item.0, Bytes::from("peach")); - assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + assert_eq!(item.1, bitmap(&[0b01010101])); assert!(merged_stream.next().await.is_none()); } diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index 46c0c76269..246e65d5ad 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -18,6 +18,7 @@ use std::num::NonZeroUsize; use async_trait::async_trait; use snafu::ensure; +use crate::bitmap::BitmapType; use crate::inverted_index::create::sort::{SortOutput, Sorter}; use crate::inverted_index::create::InvertedIndexCreator; use crate::inverted_index::error::{InconsistentRowCountSnafu, Result}; @@ -68,7 +69,11 @@ impl InvertedIndexCreator for SortIndexCreator { } /// Finalizes the sorting for all indexes and writes them using the inverted index writer - async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()> { + async fn finish( + &mut self, + writer: &mut dyn InvertedIndexWriter, + bitmap_type: BitmapType, + ) -> Result<()> { let mut output_row_count = None; for (index_name, mut sorter) in self.sorters.drain() { let SortOutput { @@ -88,7 +93,7 @@ impl InvertedIndexCreator for SortIndexCreator { ); writer - .add_index(index_name, segment_null_bitmap, sorted_stream) + .add_index(index_name, segment_null_bitmap, sorted_stream, bitmap_type) .await?; } @@ -117,9 +122,9 @@ mod tests { use futures::{stream, StreamExt}; use super::*; - use crate::inverted_index::create::sort::SortedStream; + use crate::bitmap::Bitmap; use crate::inverted_index::error::Error; - use crate::inverted_index::format::writer::MockInvertedIndexWriter; + use crate::inverted_index::format::writer::{MockInvertedIndexWriter, ValueStream}; use crate::Bytes; #[tokio::test] @@ -143,11 +148,10 @@ mod tests { } let mut mock_writer = MockInvertedIndexWriter::new(); - mock_writer - .expect_add_index() - .times(3) - .returning(|name, null_bitmap, stream| { + mock_writer.expect_add_index().times(3).returning( + |name, null_bitmap, stream, bitmap_type| { assert!(null_bitmap.is_empty()); + assert_eq!(bitmap_type, BitmapType::Roaring); match name.as_str() { "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), @@ -155,7 +159,8 @@ mod tests { _ => panic!("unexpected index name: {}", name), } Ok(()) - }); + }, + ); mock_writer .expect_finish() .times(1) @@ -165,7 +170,10 @@ mod tests { Ok(()) }); - creator.finish(&mut mock_writer).await.unwrap(); + creator + .finish(&mut mock_writer, BitmapType::Roaring) + .await + .unwrap(); } #[tokio::test] @@ -191,8 +199,9 @@ mod tests { let mut mock_writer = MockInvertedIndexWriter::new(); mock_writer .expect_add_index() - .returning(|name, null_bitmap, stream| { + .returning(|name, null_bitmap, stream, bitmap_type| { assert!(null_bitmap.is_empty()); + assert_eq!(bitmap_type, BitmapType::Roaring); match name.as_str() { "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), @@ -203,7 +212,7 @@ mod tests { }); mock_writer.expect_finish().never(); - let res = creator.finish(&mut mock_writer).await; + let res = creator.finish(&mut mock_writer, BitmapType::Roaring).await; assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); } @@ -219,8 +228,9 @@ mod tests { let mut mock_writer = MockInvertedIndexWriter::new(); mock_writer .expect_add_index() - .returning(|name, null_bitmap, stream| { + .returning(|name, null_bitmap, stream, bitmap_type| { assert!(null_bitmap.is_empty()); + assert_eq!(bitmap_type, BitmapType::Roaring); assert!(matches!(name.as_str(), "a" | "b" | "c")); assert!(stream_to_values(stream).is_empty()); Ok(()) @@ -234,7 +244,10 @@ mod tests { Ok(()) }); - creator.finish(&mut mock_writer).await.unwrap(); + creator + .finish(&mut mock_writer, BitmapType::Roaring) + .await + .unwrap(); } fn set_bit(bit_vec: &mut BitVec, index: usize) { @@ -283,20 +296,21 @@ mod tests { async fn output(&mut self) -> Result { let segment_null_bitmap = self.values.remove(&None).unwrap_or_default(); + let segment_null_bitmap = Bitmap::BitVec(segment_null_bitmap); Ok(SortOutput { segment_null_bitmap, sorted_stream: Box::new(stream::iter( std::mem::take(&mut self.values) .into_iter() - .map(|(v, b)| Ok((v.unwrap(), b))), + .map(|(v, b)| Ok((v.unwrap(), Bitmap::BitVec(b)))), )), total_row_count: self.total_row_count, }) } } - fn stream_to_values(stream: SortedStream) -> Vec { + fn stream_to_values(stream: ValueStream) -> Vec { futures::executor::block_on(async { stream.map(|r| r.unwrap().0).collect::>().await }) diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index c53e2ae9f5..fd3b640f53 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -110,6 +110,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to decode bitmap"))] + DecodeBitmap { + #[snafu(source)] + error: IoError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to decode protobuf"))] DecodeProto { #[snafu(source)] @@ -240,6 +248,7 @@ impl ErrorExt for Error { | CommonIo { .. } | UnknownIntermediateCodecMagic { .. } | FstCompile { .. } + | DecodeBitmap { .. } | InvalidFooterPayloadSize { .. } | BlobSizeTooSmall { .. } => StatusCode::Unexpected, diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 268b169979..2195d1f9e3 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -18,11 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::ResultExt; -use crate::inverted_index::error::{DecodeFstSnafu, Result}; +use crate::bitmap::{Bitmap, BitmapType}; +use crate::inverted_index::error::{DecodeBitmapSnafu, DecodeFstSnafu, Result}; pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader; use crate::inverted_index::FstMap; @@ -67,17 +67,25 @@ pub trait InvertedIndexReader: Send + Sync { } /// Retrieves the bitmap from the given offset and size. - async fn bitmap(&self, offset: u64, size: u32) -> Result { - self.range_read(offset, size).await.map(BitVec::from_vec) + async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result { + self.range_read(offset, size).await.and_then(|bytes| { + Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) + }) } /// Retrieves the multiple bitmaps from the given ranges. - async fn bitmap_deque(&mut self, ranges: &[Range]) -> Result> { - Ok(self - .read_vec(ranges) - .await? + async fn bitmap_deque( + &mut self, + ranges: &[(Range, BitmapType)], + ) -> Result> { + let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip(); + let bytes = self.read_vec(&ranges).await?; + bytes .into_iter() - .map(|bytes| BitVec::from_slice(bytes.as_ref())) - .collect::>()) + .zip(types) + .map(|(bytes, bitmap_type)| { + Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) + }) + .collect::>>() } } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index b48a322484..786447e0f0 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -78,14 +78,14 @@ impl InvertedIndexReader for InvertedIndexBlobReader { #[cfg(test)] mod tests { - use common_base::bit_vec::prelude::*; use fst::MapBuilder; - use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; + use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas}; use prost::Message; use super::*; + use crate::bitmap::Bitmap; - fn create_fake_fst() -> Vec { + fn mock_fst() -> Vec { let mut fst_buf = Vec::new(); let mut build = MapBuilder::new(&mut fst_buf).unwrap(); build.insert("key1".as_bytes(), 1).unwrap(); @@ -94,19 +94,27 @@ mod tests { fst_buf } - fn create_fake_bitmap() -> Vec { - bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0, 1, 0].into_vec() + fn mock_bitmap() -> Bitmap { + Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring) + } + + fn mock_bitmap_bytes() -> Vec { + let mut buf = Vec::new(); + mock_bitmap() + .serialize_into(BitmapType::Roaring, &mut buf) + .unwrap(); + buf } fn create_inverted_index_blob() -> Vec { - let bitmap_size = create_fake_bitmap().len(); - let fst_size = create_fake_fst().len(); + let bitmap_size = mock_bitmap_bytes().len(); + let fst_size = mock_fst().len(); // first index let mut inverted_index = Vec::new(); - inverted_index.extend_from_slice(&create_fake_bitmap()); // value bitmap - inverted_index.extend_from_slice(&create_fake_bitmap()); // null bitmap - inverted_index.extend_from_slice(&create_fake_fst()); // fst + inverted_index.extend_from_slice(&mock_bitmap_bytes()); // value bitmap + inverted_index.extend_from_slice(&mock_bitmap_bytes()); // null bitmap + inverted_index.extend_from_slice(&mock_fst()); // fst let meta = InvertedIndexMeta { name: "tag0".to_string(), @@ -116,6 +124,7 @@ mod tests { null_bitmap_size: bitmap_size as _, relative_fst_offset: (bitmap_size * 2) as _, fst_size: fst_size as _, + bitmap_type: BitmapType::Roaring as _, ..Default::default() }; @@ -128,6 +137,7 @@ mod tests { null_bitmap_size: bitmap_size as _, relative_fst_offset: (bitmap_size * 2) as _, fst_size: fst_size as _, + bitmap_type: BitmapType::Roaring as _, ..Default::default() }; @@ -168,19 +178,19 @@ mod tests { let meta0 = metas.metas.get("tag0").unwrap(); assert_eq!(meta0.name, "tag0"); assert_eq!(meta0.base_offset, 0); - assert_eq!(meta0.inverted_index_size, 54); - assert_eq!(meta0.relative_null_bitmap_offset, 2); - assert_eq!(meta0.null_bitmap_size, 2); - assert_eq!(meta0.relative_fst_offset, 4); + assert_eq!(meta0.inverted_index_size, 102); + assert_eq!(meta0.relative_null_bitmap_offset, 26); + assert_eq!(meta0.null_bitmap_size, 26); + assert_eq!(meta0.relative_fst_offset, 52); assert_eq!(meta0.fst_size, 50); let meta1 = metas.metas.get("tag1").unwrap(); assert_eq!(meta1.name, "tag1"); - assert_eq!(meta1.base_offset, 54); - assert_eq!(meta1.inverted_index_size, 54); - assert_eq!(meta1.relative_null_bitmap_offset, 2); - assert_eq!(meta1.null_bitmap_size, 2); - assert_eq!(meta1.relative_fst_offset, 4); + assert_eq!(meta1.base_offset, 102); + assert_eq!(meta1.inverted_index_size, 102); + assert_eq!(meta1.relative_null_bitmap_offset, 26); + assert_eq!(meta1.null_bitmap_size, 26); + assert_eq!(meta1.relative_fst_offset, 52); assert_eq!(meta1.fst_size, 50); } @@ -224,17 +234,29 @@ mod tests { let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); - let bitmap = blob_reader.bitmap(meta.base_offset, 2).await.unwrap(); - assert_eq!(bitmap.into_vec(), create_fake_bitmap()); - let bitmap = blob_reader.bitmap(meta.base_offset + 2, 2).await.unwrap(); - assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader + .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .await + .unwrap(); + assert_eq!(bitmap, mock_bitmap()); + let bitmap = blob_reader + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .await + .unwrap(); + assert_eq!(bitmap, mock_bitmap()); let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag1").unwrap(); - let bitmap = blob_reader.bitmap(meta.base_offset, 2).await.unwrap(); - assert_eq!(bitmap.into_vec(), create_fake_bitmap()); - let bitmap = blob_reader.bitmap(meta.base_offset + 2, 2).await.unwrap(); - assert_eq!(bitmap.into_vec(), create_fake_bitmap()); + let bitmap = blob_reader + .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .await + .unwrap(); + assert_eq!(bitmap, mock_bitmap()); + let bitmap = blob_reader + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .await + .unwrap(); + assert_eq!(bitmap, mock_bitmap()); } } diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index f167766f6f..9c4307a094 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -18,14 +18,14 @@ mod single; use std::num::NonZeroUsize; use async_trait::async_trait; -use common_base::BitVec; use futures::Stream; +use crate::bitmap::{Bitmap, BitmapType}; use crate::inverted_index::error::Result; pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; use crate::Bytes; -pub type ValueStream = Box> + Send + Unpin>; +pub type ValueStream = Box> + Send + Unpin>; /// Trait for writing inverted index data to underlying storage. #[mockall::automock] @@ -37,11 +37,13 @@ pub trait InvertedIndexWriter: Send { /// * `null_bitmap` marks positions of null entries. /// * `values` is a stream of values and their locations, yielded lexicographically. /// Errors occur if the values are out of order. + /// * `bitmap_type` is the type of bitmap to encode. async fn add_index( &mut self, name: String, - null_bitmap: BitVec, + null_bitmap: Bitmap, values: ValueStream, + bitmap_type: BitmapType, ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index ff4898d0dd..aed0f9d894 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -15,12 +15,12 @@ use std::num::NonZeroUsize; use async_trait::async_trait; -use common_base::BitVec; use futures::{AsyncWrite, AsyncWriteExt}; use greptime_proto::v1::index::InvertedIndexMetas; use prost::Message; use snafu::ResultExt; +use crate::bitmap::{Bitmap, BitmapType}; use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; use crate::inverted_index::format::writer::single::SingleIndexWriter; use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream}; @@ -43,8 +43,9 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit async fn add_index( &mut self, name: String, - null_bitmap: BitVec, + null_bitmap: Bitmap, values: ValueStream, + bitmap_type: BitmapType, ) -> Result<()> { let single_writer = SingleIndexWriter::new( name.clone(), @@ -52,6 +53,7 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit null_bitmap, values, &mut self.blob_writer, + bitmap_type, ); let metadata = single_writer.write().await?; @@ -100,6 +102,7 @@ impl InvertedIndexBlobWriter { #[cfg(test)] mod tests { use futures::stream; + use greptime_proto::v1::index::BitmapType; use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; @@ -132,24 +135,44 @@ mod tests { writer .add_index( "tag0".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), Box::new(stream::iter(vec![ - Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("a"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("b"), + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("c"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ])), + BitmapType::Roaring, ) .await .unwrap(); writer .add_index( "tag1".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), Box::new(stream::iter(vec![ - Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("x"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("y"), + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("z"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ])), + BitmapType::Roaring, ) .await .unwrap(); @@ -181,22 +204,31 @@ mod tests { assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring) + ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); // tag1 let tag1 = metadata.metas.get("tag1").unwrap(); @@ -215,21 +247,30 @@ mod tests { assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring) + ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); } } diff --git a/src/index/src/inverted_index/format/writer/single.rs b/src/index/src/inverted_index/format/writer/single.rs index e101873203..0f60c53e11 100644 --- a/src/index/src/inverted_index/format/writer/single.rs +++ b/src/index/src/inverted_index/format/writer/single.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_base::BitVec; use fst::MapBuilder; use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt}; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats}; use snafu::ResultExt; +use crate::bitmap::{Bitmap, BitmapType}; use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu}; use crate::Bytes; @@ -27,7 +27,7 @@ pub struct SingleIndexWriter { blob_writer: W, /// The null bitmap to be written - null_bitmap: BitVec, + null_bitmap: Bitmap, /// The stream of values to be written, yielded lexicographically values: S, @@ -37,30 +37,40 @@ pub struct SingleIndexWriter { /// Metadata about the index meta: InvertedIndexMeta, + + /// The type of bitmap to use + bitmap_type: BitmapType, + + /// Buffer for writing the blob + buf: Vec, } impl SingleIndexWriter where W: AsyncWrite + Send + Unpin, - S: Stream> + Send + Unpin, + S: Stream> + Send + Unpin, { /// Constructs a new `SingleIndexWriter` pub fn new( name: String, base_offset: u64, - null_bitmap: BitVec, + null_bitmap: Bitmap, values: S, blob_writer: W, + bitmap_type: BitmapType, ) -> SingleIndexWriter { SingleIndexWriter { blob_writer, null_bitmap, values, fst: MapBuilder::memory(), + bitmap_type, + buf: Vec::new(), meta: InvertedIndexMeta { name, base_offset, stats: Some(InvertedIndexStats::default()), + bitmap_type: bitmap_type.into(), ..Default::default() }, } @@ -80,14 +90,17 @@ where /// Writes the null bitmap to the blob and updates the metadata accordingly async fn write_null_bitmap(&mut self) -> Result<()> { - let null_bitmap_bytes = self.null_bitmap.as_raw_slice(); + self.buf.clear(); + self.null_bitmap + .serialize_into(self.bitmap_type, &mut self.buf) + .expect("Write to vec should not fail"); self.blob_writer - .write_all(null_bitmap_bytes) + .write_all(&self.buf) .await .context(WriteSnafu)?; self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _; - self.meta.null_bitmap_size = null_bitmap_bytes.len() as _; + self.meta.null_bitmap_size = self.buf.len() as _; self.meta.inverted_index_size += self.meta.null_bitmap_size as u64; // update stats @@ -100,15 +113,18 @@ where } /// Appends a value and its bitmap to the blob, updates the FST, and the metadata - async fn append_value(&mut self, value: Bytes, bitmap: BitVec) -> Result<()> { - let bitmap_bytes = bitmap.into_vec(); + async fn append_value(&mut self, value: Bytes, bitmap: Bitmap) -> Result<()> { + self.buf.clear(); + bitmap + .serialize_into(self.bitmap_type, &mut self.buf) + .expect("Write to vec should not fail"); self.blob_writer - .write_all(&bitmap_bytes) + .write_all(&self.buf) .await .context(WriteSnafu)?; let offset = self.meta.inverted_index_size as u32; - let size = bitmap_bytes.len() as u32; + let size = self.buf.len() as u32; self.meta.inverted_index_size += size as u64; let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]); @@ -157,9 +173,10 @@ mod tests { let writer = SingleIndexWriter::new( "test".to_string(), 0, - BitVec::new(), + Bitmap::new_roaring(), stream::empty(), &mut blob, + BitmapType::Roaring, ); let meta = writer.write().await.unwrap(); @@ -174,13 +191,23 @@ mod tests { let writer = SingleIndexWriter::new( "test".to_string(), 0, - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), stream::iter(vec![ - Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))), - Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("a"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("b"), + Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("c"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ]), &mut blob, + BitmapType::Roaring, ); let meta = writer.write().await.unwrap(); @@ -199,13 +226,23 @@ mod tests { let writer = SingleIndexWriter::new( "test".to_string(), 0, - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), stream::iter(vec![ - Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))), - Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("b"), + Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("a"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("c"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ]), &mut blob, + BitmapType::Roaring, ); let res = writer.write().await; assert!(matches!(res, Err(Error::FstInsert { .. }))); diff --git a/src/index/src/inverted_index/search/fst_values_mapper.rs b/src/index/src/inverted_index/search/fst_values_mapper.rs index 54a842de02..f9c15c40d8 100644 --- a/src/index/src/inverted_index/search/fst_values_mapper.rs +++ b/src/index/src/inverted_index/search/fst_values_mapper.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_base::BitVec; -use greptime_proto::v1::index::InvertedIndexMeta; +use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta}; +use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; use crate::inverted_index::format::reader::InvertedIndexReader; @@ -36,7 +36,7 @@ impl<'a> ParallelFstValuesMapper<'a> { pub async fn map_values_vec( &mut self, value_and_meta_vec: &[(Vec, &'a InvertedIndexMeta)], - ) -> Result> { + ) -> Result> { let groups = value_and_meta_vec .iter() .map(|(values, _)| values.len()) @@ -50,15 +50,17 @@ impl<'a> ParallelFstValuesMapper<'a> { // bitmap offset and the lower 32 bits represent its size. This mapper uses these // combined offset-size pairs to fetch and union multiple bitmaps into a single `BitVec`. let [relative_offset, size] = bytemuck::cast::(*value); - fetch_ranges.push( - meta.base_offset + relative_offset as u64 - ..meta.base_offset + relative_offset as u64 + size as u64, - ); + let range = meta.base_offset + relative_offset as u64 + ..meta.base_offset + relative_offset as u64 + size as u64; + fetch_ranges.push(( + range, + BitmapType::try_from(meta.bitmap_type).unwrap_or(BitmapType::BitVec), + )); } } if fetch_ranges.is_empty() { - return Ok(vec![BitVec::new()]); + return Ok(vec![Bitmap::new_bitvec()]); } common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges); @@ -66,14 +68,10 @@ impl<'a> ParallelFstValuesMapper<'a> { let mut output = Vec::with_capacity(groups.len()); for counter in groups { - let mut bitmap = BitVec::new(); + let mut bitmap = Bitmap::new_roaring(); for _ in 0..counter { let bm = bitmaps.pop_front().unwrap(); - if bm.len() > bitmap.len() { - bitmap = bm | bitmap - } else { - bitmap |= bm - } + bitmap.union(bm); } output.push(bitmap); @@ -87,8 +85,6 @@ impl<'a> ParallelFstValuesMapper<'a> { mod tests { use std::collections::VecDeque; - use common_base::bit_vec::prelude::*; - use super::*; use crate::inverted_index::format::reader::MockInvertedIndexReader; @@ -101,19 +97,26 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader.expect_bitmap_deque().returning(|ranges| { let mut output = VecDeque::new(); - for range in ranges { + for (range, bitmap_type) in ranges { let offset = range.start; let size = range.end - range.start; - match (offset, size) { - (1, 1) => output.push_back(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]), - (2, 1) => output.push_back(bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]), + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type)) + } _ => unreachable!(), } } Ok(output) }); - let meta = InvertedIndexMeta::default(); + let meta = InvertedIndexMeta { + bitmap_type: BitmapType::Roaring.into(), + ..Default::default() + }; let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader); let result = values_mapper @@ -126,33 +129,50 @@ mod tests { .map_values_vec(&[(vec![value(1, 1)], &meta)]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]); + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) + ); let result = values_mapper .map_values_vec(&[(vec![value(2, 1)], &meta)]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]); + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); let result = values_mapper .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]); + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring) + ); let result = values_mapper .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]); + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring) + ); let result = values_mapper .map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]); - assert_eq!(result[1], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]); - + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring) + ); + assert_eq!( + result[1], + Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) + ); let result = values_mapper .map_values_vec(&[ (vec![value(2, 1), value(1, 1)], &meta), @@ -160,7 +180,13 @@ mod tests { ]) .await .unwrap(); - assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]); - assert_eq!(result[1], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]); + assert_eq!( + result[0], + Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring) + ); + assert_eq!( + result[1], + Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) + ); } } diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index 654796b4d0..65f9331ebb 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -15,17 +15,17 @@ mod predicates_apply; use async_trait::async_trait; -use common_base::BitVec; pub use predicates_apply::PredicatesIndexApplier; +use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; use crate::inverted_index::format::reader::InvertedIndexReader; /// The output of an apply operation. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct ApplyOutput { /// Bitmap of indices that match the predicates. - pub matched_segment_ids: BitVec, + pub matched_segment_ids: Bitmap, /// The total number of rows in the index. pub total_row_count: usize, diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index cf8e30085f..39b9235f40 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -15,9 +15,9 @@ use std::mem::size_of; use async_trait::async_trait; -use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; +use crate::bitmap::Bitmap; use crate::inverted_index::error::{IndexNotFoundSnafu, Result}; use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::search::fst_apply::{ @@ -50,12 +50,11 @@ impl IndexApplier for PredicatesIndexApplier { ) -> Result { let metadata = reader.metadata().await?; let mut output = ApplyOutput { - matched_segment_ids: BitVec::EMPTY, + matched_segment_ids: Bitmap::new_bitvec(), total_row_count: metadata.total_row_count as _, segment_row_count: metadata.segment_row_count as _, }; - let mut bitmap = Self::bitmap_full_range(&metadata); // TODO(zhongzc): optimize the order of applying to make it quicker to return empty. let mut appliers = Vec::with_capacity(self.fst_appliers.len()); let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len()); @@ -81,7 +80,7 @@ impl IndexApplier for PredicatesIndexApplier { } if fst_ranges.is_empty() { - output.matched_segment_ids = bitmap; + output.matched_segment_ids = Self::bitmap_full_range(&metadata); return Ok(output); } @@ -93,14 +92,15 @@ impl IndexApplier for PredicatesIndexApplier { .collect::>(); let mut mapper = ParallelFstValuesMapper::new(reader); - let bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?; + let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?; + let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty for bm in bm_vec { - if bitmap.count_ones() == 0 { + if bm.count_ones() == 0 { break; } - bitmap &= bm; + bitmap.intersect(bm); } output.matched_segment_ids = bitmap; @@ -146,12 +146,12 @@ impl PredicatesIndexApplier { Ok(PredicatesIndexApplier { fst_appliers }) } - /// Creates a `BitVec` representing the full range of data in the index for initial scanning. - fn bitmap_full_range(metadata: &InvertedIndexMetas) -> BitVec { + /// Creates a `Bitmap` representing the full range of data in the index for initial scanning. + fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap { let total_count = metadata.total_row_count; let segment_count = metadata.segment_row_count; let len = total_count.div_ceil(segment_count); - BitVec::repeat(true, len as _) + Bitmap::full_bitvec(len as _) } } @@ -167,10 +167,10 @@ mod tests { use std::collections::VecDeque; use std::sync::Arc; - use common_base::bit_vec::prelude::*; - use greptime_proto::v1::index::InvertedIndexMeta; + use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta}; use super::*; + use crate::bitmap::Bitmap; use crate::inverted_index::error::Error; use crate::inverted_index::format::reader::MockInvertedIndexReader; use crate::inverted_index::search::fst_apply::MockFstApplier; @@ -190,6 +190,7 @@ mod tests { let meta = InvertedIndexMeta { name: s(tag), relative_fst_offset: idx, + bitmap_type: BitmapType::Roaring.into(), ..Default::default() }; metas.metas.insert(s(tag), meta); @@ -229,10 +230,16 @@ mod tests { .unwrap()]) }); - mock_reader.expect_bitmap_deque().returning(|range| { - assert_eq!(range.len(), 1); - assert_eq!(range[0], 2..3); - Ok(VecDeque::from([bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]])) + mock_reader.expect_bitmap_deque().returning(|arg| { + assert_eq!(arg.len(), 1); + let range = &arg[0].0; + let bitmap_type = arg[0].1; + assert_eq!(*range, 2..3); + assert_eq!(bitmap_type, BitmapType::Roaring); + Ok(VecDeque::from([Bitmap::from_lsb0_bytes( + &[0b10101010], + bitmap_type, + )])) }); let output = applier .apply(SearchContext::default(), &mut mock_reader) @@ -240,7 +247,7 @@ mod tests { .unwrap(); assert_eq!( output.matched_segment_ids, - bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0] + Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) ); // An index reader with a single tag "tag-0" but without value "tag-0_value-0" @@ -292,12 +299,16 @@ mod tests { }); mock_reader.expect_bitmap_deque().returning(|ranges| { let mut output = VecDeque::new(); - for range in ranges { + for (range, bitmap_type) in ranges { let offset = range.start; let size = range.end - range.start; - match (offset, size) { - (1, 1) => output.push_back(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]), - (2, 1) => output.push_back(bitvec![u8, Lsb0; 1, 1, 0, 1, 1, 0, 1, 1]), + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type)) + } _ => unreachable!(), } } @@ -311,7 +322,7 @@ mod tests { .unwrap(); assert_eq!( output.matched_segment_ids, - bitvec![u8, Lsb0; 1, 0, 0, 0, 1, 0, 1, 0] + Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring) ); } @@ -330,10 +341,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!( - output.matched_segment_ids, - bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1] - ); // full range to scan + assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan } #[tokio::test] @@ -405,10 +413,7 @@ mod tests { ) .await .unwrap(); - assert_eq!( - output.matched_segment_ids, - bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1] - ); + assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); } #[test] diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 91850424ad..8c88a8d800 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -15,6 +15,7 @@ #![feature(iter_partition_in_place)] #![feature(assert_matches)] +pub mod bitmap; pub mod bloom_filter; pub mod error; pub mod external_provider; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 1cca175f59..68eb2f89f5 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -127,8 +127,8 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead mod test { use std::num::NonZeroUsize; - use common_base::BitVec; use futures::stream; + use index::bitmap::{Bitmap, BitmapType}; use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter}; use index::Bytes; @@ -191,24 +191,44 @@ mod test { writer .add_index( "tag0".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), Box::new(stream::iter(vec![ - Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("a"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("b"), + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("c"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ])), + index::bitmap::BitmapType::Roaring, ) .await .unwrap(); writer .add_index( "tag1".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring), Box::new(stream::iter(vec![ - Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), + Ok(( + Bytes::from("x"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), + Ok(( + Bytes::from("y"), + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring), + )), + Ok(( + Bytes::from("z"), + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring), + )), ])), + index::bitmap::BitmapType::Roaring, ) .await .unwrap(); @@ -267,22 +287,31 @@ mod test { assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring) + ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) + .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); // tag1 let tag1 = metadata.metas.get("tag1").unwrap(); @@ -301,22 +330,31 @@ mod test { assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring) + ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) + .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .await .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + assert_eq!( + bitmap, + Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring) + ); // fuzz test let mut rng = rand::thread_rng(); diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 5362c1dd1d..5f0b267214 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -228,8 +228,8 @@ impl Drop for InvertedIndexApplier { #[cfg(test)] mod tests { - use common_base::BitVec; use futures::io::Cursor; + use index::bitmap::Bitmap; use index::inverted_index::search::index_apply::MockIndexApplier; use object_store::services::Memory; use puffin::puffin_manager::PuffinWriter; @@ -259,7 +259,7 @@ mod tests { mock_index_applier.expect_memory_usage().returning(|| 100); mock_index_applier.expect_apply().returning(|_, _| { Ok(ApplyOutput { - matched_segment_ids: BitVec::EMPTY, + matched_segment_ids: Bitmap::new_bitvec(), total_row_count: 100, segment_row_count: 10, }) @@ -276,7 +276,7 @@ mod tests { assert_eq!( output, ApplyOutput { - matched_segment_ids: BitVec::EMPTY, + matched_segment_ids: Bitmap::new_bitvec(), total_row_count: 100, segment_row_count: 10, } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 83510f49ca..f67ab9aec0 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -277,7 +277,9 @@ impl InvertedIndexer { let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write()); let (index_finish, puffin_add_blob) = futures::join!( - self.index_creator.finish(&mut index_writer), + // TODO(zhongzc): config bitmap type + self.index_creator + .finish(&mut index_writer, index::bitmap::BitmapType::Roaring), puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default()) );