mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
feat: introduce roaring bitmap to optimize sparse value scenarios (#5603)
* feat: introduce roaring bitmap to optimize sparse value scenarios Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix taplo Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * polish Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -5566,6 +5566,7 @@ dependencies = [
|
||||
"rand",
|
||||
"regex",
|
||||
"regex-automata 0.4.8",
|
||||
"roaring",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
@@ -9632,6 +9633,16 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "roaring"
|
||||
version = "0.10.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41589aba99537475bf697f2118357cad1c31590c5a1b9f6d9fc4ad6d07503661"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "robust"
|
||||
version = "1.1.0"
|
||||
|
||||
@@ -29,6 +29,7 @@ prost.workspace = true
|
||||
puffin.workspace = true
|
||||
regex.workspace = true
|
||||
regex-automata.workspace = true
|
||||
roaring = "0.10"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
868
src/index/src/bitmap.rs
Normal file
868
src/index/src/bitmap.rs
Normal file
@@ -0,0 +1,868 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io;
|
||||
use std::ops::RangeInclusive;
|
||||
|
||||
use common_base::BitVec;
|
||||
/// `BitmapType` enumerates how bitmaps are encoded within the inverted index.
|
||||
pub use greptime_proto::v1::index::BitmapType;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
/// A bitmap representation supporting both BitVec and RoaringBitmap formats.
|
||||
///
|
||||
/// This enum provides unified bitmap operations while allowing efficient storage
|
||||
/// in different formats. The implementation automatically handles type conversions
|
||||
/// when performing operations between different formats.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Creating a new Roaring bitmap:
|
||||
/// ```
|
||||
/// use bitmap::Bitmap;
|
||||
/// let bitmap = Bitmap::new_roaring();
|
||||
/// assert!(bitmap.is_empty());
|
||||
/// ```
|
||||
///
|
||||
/// Creating a full BitVec bitmap:
|
||||
/// ```
|
||||
/// use bitmap::Bitmap;
|
||||
/// let bitmap = Bitmap::full_bitvec(10);
|
||||
/// assert_eq!(bitmap.count_ones(), 10);
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum Bitmap {
|
||||
Roaring(RoaringBitmap),
|
||||
BitVec(BitVec),
|
||||
}
|
||||
|
||||
impl Bitmap {
|
||||
/// Creates a new empty BitVec-based bitmap.
|
||||
pub fn new_bitvec() -> Self {
|
||||
Bitmap::BitVec(BitVec::EMPTY)
|
||||
}
|
||||
|
||||
/// Creates a new empty RoaringBitmap-based bitmap.
|
||||
pub fn new_roaring() -> Self {
|
||||
Bitmap::Roaring(RoaringBitmap::new())
|
||||
}
|
||||
|
||||
/// Creates a full BitVec-based bitmap with all bits set to 1.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `size` - The number of bits to allocate and set
|
||||
pub fn full_bitvec(size: usize) -> Self {
|
||||
Bitmap::BitVec(BitVec::repeat(true, size))
|
||||
}
|
||||
|
||||
/// Creates a full RoaringBitmap-based bitmap with bits 0..size set to 1.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `size` - The exclusive upper bound for the bit range
|
||||
pub fn full_roaring(size: usize) -> Self {
|
||||
let mut roaring = RoaringBitmap::new();
|
||||
roaring.insert_range(0..size as u32);
|
||||
Bitmap::Roaring(roaring)
|
||||
}
|
||||
|
||||
/// Returns the number of bits set to 1 in the bitmap.
|
||||
pub fn count_ones(&self) -> usize {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => bitvec.count_ones(),
|
||||
Bitmap::Roaring(roaring) => roaring.len() as _,
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the bitmap contains no set bits.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => bitvec.is_empty(),
|
||||
Bitmap::Roaring(roaring) => roaring.is_empty(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a range of bits into the bitmap.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `range` - Inclusive range of bits to set
|
||||
pub fn insert_range(&mut self, range: RangeInclusive<usize>) {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => {
|
||||
if *range.end() >= bitvec.len() {
|
||||
bitvec.resize(range.end() + 1, false);
|
||||
}
|
||||
for i in range {
|
||||
bitvec.set(i, true);
|
||||
}
|
||||
}
|
||||
Bitmap::Roaring(roaring) => {
|
||||
let range = *range.start() as u32..=*range.end() as u32;
|
||||
roaring.insert_range(range);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the bitmap into a byte buffer using the specified format.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `serialize_type` - Target format for serialization
|
||||
/// * `writer` - Output writer to write the serialized data
|
||||
pub fn serialize_into(
|
||||
&self,
|
||||
serialize_type: BitmapType,
|
||||
mut writer: impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
match (self, serialize_type) {
|
||||
(Bitmap::BitVec(bitvec), BitmapType::BitVec) => {
|
||||
writer.write_all(bitvec.as_raw_slice())?;
|
||||
}
|
||||
(Bitmap::Roaring(roaring), BitmapType::Roaring) => {
|
||||
roaring.serialize_into(writer)?;
|
||||
}
|
||||
(Bitmap::BitVec(bitvec), BitmapType::Roaring) => {
|
||||
let bitmap = Bitmap::bitvec_to_roaring(bitvec.clone());
|
||||
bitmap.serialize_into(writer)?;
|
||||
}
|
||||
(Bitmap::Roaring(roaring), BitmapType::BitVec) => {
|
||||
let bitvec = Bitmap::roaring_to_bitvec(roaring);
|
||||
writer.write_all(bitvec.as_raw_slice())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Computes the size of the serialized bitmap in bytes.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `bitmap_type` - Format of data to be serialized
|
||||
pub fn serialized_size(&self, bitmap_type: BitmapType) -> usize {
|
||||
match (self, bitmap_type) {
|
||||
(Bitmap::BitVec(bitvec), BitmapType::BitVec) => bitvec.as_raw_slice().len(),
|
||||
(Bitmap::Roaring(roaring), BitmapType::Roaring) => roaring.serialized_size(),
|
||||
(Bitmap::BitVec(bitvec), BitmapType::Roaring) => {
|
||||
let bitmap = Bitmap::bitvec_to_roaring(bitvec.clone());
|
||||
bitmap.serialized_size()
|
||||
}
|
||||
(Bitmap::Roaring(roaring), BitmapType::BitVec) => {
|
||||
let bitvec = Bitmap::roaring_to_bitvec(roaring);
|
||||
bitvec.as_raw_slice().len()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserializes a bitmap from a byte buffer.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `buf` - Input buffer containing serialized data
|
||||
/// * `bitmap_type` - Format of the serialized data
|
||||
pub fn deserialize_from(buf: &[u8], bitmap_type: BitmapType) -> std::io::Result<Self> {
|
||||
match bitmap_type {
|
||||
BitmapType::BitVec => {
|
||||
let bitvec = BitVec::from_slice(buf);
|
||||
Ok(Bitmap::BitVec(bitvec))
|
||||
}
|
||||
BitmapType::Roaring => {
|
||||
let roaring = RoaringBitmap::deserialize_from(buf)?;
|
||||
Ok(Bitmap::Roaring(roaring))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the union with another bitmap (in-place).
|
||||
///
|
||||
/// If the other bitmap is a different type, it will be converted to match
|
||||
/// the current bitmap's type.
|
||||
pub fn union(&mut self, other: Self) {
|
||||
if self.is_empty() {
|
||||
*self = other;
|
||||
return;
|
||||
}
|
||||
|
||||
match (self, other) {
|
||||
(Bitmap::BitVec(bitvec1), bitmap) => {
|
||||
let bitvec2 = bitmap.into_bitvec();
|
||||
if bitvec1.len() > bitvec2.len() {
|
||||
*bitvec1 |= bitvec2
|
||||
} else {
|
||||
*bitvec1 = bitvec2 | &*bitvec1;
|
||||
}
|
||||
}
|
||||
(Bitmap::Roaring(roaring1), bitmap) => {
|
||||
let roaring2 = bitmap.into_roaring();
|
||||
*roaring1 |= roaring2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the intersection with another bitmap (in-place).
|
||||
///
|
||||
/// If the other bitmap is a different type, it will be converted to match
|
||||
/// the current bitmap's type.
|
||||
pub fn intersect(&mut self, other: Self) {
|
||||
match (self, other) {
|
||||
(Bitmap::BitVec(bitvec1), bitmap) => {
|
||||
let mut bitvec2 = bitmap.into_bitvec();
|
||||
let len = (bitvec1.len() - bitvec1.trailing_zeros())
|
||||
.min(bitvec2.len() - bitvec2.trailing_zeros());
|
||||
bitvec1.truncate(len);
|
||||
bitvec2.truncate(len);
|
||||
*bitvec1 &= bitvec2;
|
||||
}
|
||||
(Bitmap::Roaring(roaring1), bitmap) => {
|
||||
let roaring2 = bitmap.into_roaring();
|
||||
*roaring1 &= roaring2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the indices of set bits.
|
||||
pub fn iter_ones(&self) -> Box<dyn Iterator<Item = usize> + '_> {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => Box::new(bitvec.iter_ones()),
|
||||
Bitmap::Roaring(roaring) => Box::new(roaring.iter().map(|x| x as usize)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a bitmap from bytes in LSB0 (least significant bit first) order.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `bytes` - Input bytes in LSB0 order
|
||||
/// * `bitmap_type` - Type of bitmap to create
|
||||
pub fn from_lsb0_bytes(bytes: &[u8], bitmap_type: BitmapType) -> Self {
|
||||
match bitmap_type {
|
||||
BitmapType::BitVec => {
|
||||
let bitvec = BitVec::from_slice(bytes);
|
||||
Bitmap::BitVec(bitvec)
|
||||
}
|
||||
BitmapType::Roaring => {
|
||||
let roaring = RoaringBitmap::from_lsb0_bytes(0, bytes);
|
||||
Bitmap::Roaring(roaring)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes memory usage of the bitmap in bytes.
|
||||
pub fn memory_usage(&self) -> usize {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => bitvec.capacity(),
|
||||
Bitmap::Roaring(roaring) => {
|
||||
let stat = roaring.statistics();
|
||||
(stat.n_bytes_array_containers
|
||||
+ stat.n_bytes_bitset_containers
|
||||
+ stat.n_bytes_run_containers) as usize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_bitvec(self) -> BitVec {
|
||||
match self {
|
||||
Bitmap::BitVec(bitvec) => bitvec,
|
||||
Bitmap::Roaring(roaring) => Self::roaring_to_bitvec(&roaring),
|
||||
}
|
||||
}
|
||||
|
||||
fn into_roaring(self) -> RoaringBitmap {
|
||||
match self {
|
||||
Bitmap::Roaring(roaring) => roaring,
|
||||
Bitmap::BitVec(bitvec) => Self::bitvec_to_roaring(bitvec),
|
||||
}
|
||||
}
|
||||
|
||||
fn roaring_to_bitvec(roaring: &RoaringBitmap) -> BitVec {
|
||||
let max_value = roaring.max().unwrap_or(0);
|
||||
let mut bitvec = BitVec::repeat(false, max_value as usize + 1);
|
||||
for i in roaring {
|
||||
bitvec.set(i as usize, true);
|
||||
}
|
||||
bitvec
|
||||
}
|
||||
|
||||
fn bitvec_to_roaring(mut bitvec: BitVec) -> RoaringBitmap {
|
||||
bitvec.resize(bitvec.capacity(), false);
|
||||
RoaringBitmap::from_lsb0_bytes(0, bitvec.as_raw_slice())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Bitmap {
|
||||
fn default() -> Self {
|
||||
Bitmap::new_roaring()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_full_bitmaps() {
|
||||
let bv = Bitmap::full_bitvec(10);
|
||||
assert_eq!(bv.count_ones(), 10);
|
||||
|
||||
let rb = Bitmap::full_roaring(10);
|
||||
assert_eq!(rb.count_ones(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serialization_roundtrip() {
|
||||
let original = Bitmap::full_roaring(100);
|
||||
let mut buf = Vec::new();
|
||||
|
||||
// Serialize as Roaring
|
||||
original
|
||||
.serialize_into(BitmapType::Roaring, &mut buf)
|
||||
.unwrap();
|
||||
let deserialized = Bitmap::deserialize_from(&buf, BitmapType::Roaring).unwrap();
|
||||
assert_eq!(original, deserialized);
|
||||
|
||||
// Serialize as BitVec
|
||||
buf.clear();
|
||||
original
|
||||
.serialize_into(BitmapType::BitVec, &mut buf)
|
||||
.unwrap();
|
||||
let deserialized = Bitmap::deserialize_from(&buf, BitmapType::BitVec).unwrap();
|
||||
assert_eq!(original.count_ones(), deserialized.count_ones());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_union_fulls() {
|
||||
// Test BitVec union
|
||||
let mut bv1 = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
let bv2 = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
bv1.union(bv2);
|
||||
assert_eq!(bv1.count_ones(), 5);
|
||||
|
||||
let mut bv1 = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
let bv2 = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
bv1.union(bv2);
|
||||
assert_eq!(bv1.count_ones(), 5);
|
||||
|
||||
// Test Roaring union
|
||||
let mut rb1 = Bitmap::full_roaring(3); // 0-2: 111
|
||||
let rb2 = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
rb1.union(rb2);
|
||||
assert_eq!(rb1.count_ones(), 5);
|
||||
|
||||
let mut rb1 = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
let rb2 = Bitmap::full_roaring(3); // 0-2: 111
|
||||
rb1.union(rb2);
|
||||
assert_eq!(rb1.count_ones(), 5);
|
||||
|
||||
// Test cross-type union
|
||||
let mut rb = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
let bv = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
rb.union(bv);
|
||||
assert_eq!(rb.count_ones(), 5);
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
let rb = Bitmap::full_roaring(3); // 0-2: 111
|
||||
bv.union(rb);
|
||||
assert_eq!(bv.count_ones(), 5);
|
||||
|
||||
let mut rb = Bitmap::full_roaring(3); // 0-2: 111
|
||||
let bv = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
rb.union(bv);
|
||||
assert_eq!(rb.count_ones(), 5);
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
let rb = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
bv.union(rb);
|
||||
assert_eq!(bv.count_ones(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_union_bitvec() {
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
bv1.union(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
// Test different lengths
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101, 0b00000001], BitmapType::BitVec);
|
||||
bv1.union(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
bv1.union(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
// Test empty bitmaps
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.union(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
bv1.union(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.union(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
// Test empty and full bitmaps
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::full_bitvec(8);
|
||||
bv1.union(bv2);
|
||||
assert_eq!(bv1, Bitmap::full_bitvec(8));
|
||||
|
||||
let mut bv1 = Bitmap::full_bitvec(8);
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.union(bv2);
|
||||
assert_eq!(bv1, Bitmap::full_bitvec(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_union_roaring() {
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
rb1.union(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// Test different lengths
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101, 0b00000001], BitmapType::Roaring);
|
||||
rb1.union(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
rb1.union(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111, 0b00000001], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// Test empty bitmaps
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.union(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
rb1.union(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.union(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// Test empty and full bit
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::full_roaring(8);
|
||||
rb1.union(rb2);
|
||||
assert_eq!(rb1, Bitmap::full_roaring(8));
|
||||
|
||||
let mut rb1 = Bitmap::full_roaring(8);
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.union(rb2);
|
||||
assert_eq!(rb1, Bitmap::full_roaring(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_union_mixed() {
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
rb.union(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
bv.union(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let bv = Bitmap::full_bitvec(8);
|
||||
rb.union(bv);
|
||||
assert_eq!(rb, Bitmap::full_roaring(8));
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(8);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
bv.union(rb);
|
||||
assert_eq!(bv, Bitmap::full_bitvec(8));
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
let bv = Bitmap::full_bitvec(8);
|
||||
rb.union(bv);
|
||||
assert_eq!(rb, Bitmap::full_bitvec(8));
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(8);
|
||||
let rb = Bitmap::new_roaring();
|
||||
bv.union(rb);
|
||||
assert_eq!(bv, Bitmap::full_bitvec(8));
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
let bv = Bitmap::new_bitvec();
|
||||
rb.union(bv);
|
||||
assert!(rb.is_empty());
|
||||
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
let rb = Bitmap::new_roaring();
|
||||
bv.union(rb);
|
||||
assert!(bv.is_empty());
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
rb.union(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec);
|
||||
let rb = Bitmap::new_roaring();
|
||||
bv.union(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
let bv = Bitmap::new_bitvec();
|
||||
rb.union(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring);
|
||||
bv.union(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersect_fulls() {
|
||||
// Test BitVec intersect
|
||||
let mut bv1 = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
let bv2 = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
bv1.intersect(bv2);
|
||||
assert_eq!(bv1.count_ones(), 3);
|
||||
|
||||
let mut bv1 = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
let bv2 = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
bv1.intersect(bv2);
|
||||
assert_eq!(bv1.count_ones(), 3);
|
||||
|
||||
// Test Roaring intersect
|
||||
let mut rb1 = Bitmap::full_roaring(3); // 0-2: 111
|
||||
let rb2 = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
rb1.intersect(rb2);
|
||||
assert_eq!(rb1.count_ones(), 3);
|
||||
|
||||
let mut rb1 = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
let rb2 = Bitmap::full_roaring(3); // 0-2: 111
|
||||
rb1.intersect(rb2);
|
||||
assert_eq!(rb1.count_ones(), 3);
|
||||
|
||||
// Test cross-type intersect
|
||||
let mut rb = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
let bv = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
rb.intersect(bv);
|
||||
assert_eq!(rb.count_ones(), 3);
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
let rb = Bitmap::full_roaring(3); // 0-2: 111
|
||||
bv.intersect(rb);
|
||||
assert_eq!(bv.count_ones(), 3);
|
||||
|
||||
let mut rb = Bitmap::full_roaring(3); // 0-2: 111
|
||||
let bv = Bitmap::full_bitvec(5); // 0-4: 11111
|
||||
rb.intersect(bv);
|
||||
assert_eq!(rb.count_ones(), 3);
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(3); // 0-2: 111
|
||||
let rb = Bitmap::full_roaring(5); // 0-4: 11111
|
||||
bv.intersect(rb);
|
||||
assert_eq!(bv.count_ones(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersect_bitvec() {
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
bv1.intersect(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
// Test different lengths
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec);
|
||||
bv1.intersect(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
bv1.intersect(bv2);
|
||||
assert_eq!(
|
||||
bv1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
// Test empty bitmaps
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.intersect(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
bv1.intersect(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
|
||||
let mut bv1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.intersect(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
|
||||
// Test empty and full bitmaps
|
||||
let mut bv1 = Bitmap::new_bitvec();
|
||||
let bv2 = Bitmap::full_bitvec(8);
|
||||
bv1.intersect(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
|
||||
let mut bv1 = Bitmap::full_bitvec(8);
|
||||
let bv2 = Bitmap::new_bitvec();
|
||||
bv1.intersect(bv2);
|
||||
assert!(bv1.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersect_roaring() {
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
rb1.intersect(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// Test different lengths
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring);
|
||||
rb1.intersect(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
rb1.intersect(rb2);
|
||||
assert_eq!(
|
||||
rb1,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// Test empty bitmaps
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.intersect(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
rb1.intersect(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
|
||||
let mut rb1 = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.intersect(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
|
||||
// Test empty and full bitmaps
|
||||
let mut rb1 = Bitmap::new_roaring();
|
||||
let rb2 = Bitmap::full_roaring(8);
|
||||
rb1.intersect(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
|
||||
let mut rb1 = Bitmap::full_roaring(8);
|
||||
let rb2 = Bitmap::new_roaring();
|
||||
rb1.intersect(rb2);
|
||||
assert!(rb1.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intersect_mixed() {
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
rb.intersect(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
bv.intersect(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
let bv = Bitmap::full_bitvec(8);
|
||||
rb.intersect(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(8);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
bv.intersect(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::Roaring);
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::BitVec);
|
||||
rb.intersect(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::BitVec);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
bv.intersect(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b11110000, 0b00000001], BitmapType::Roaring);
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
rb.intersect(bv);
|
||||
assert_eq!(
|
||||
rb,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b11110000], BitmapType::BitVec);
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b10101010, 0b00000001], BitmapType::Roaring);
|
||||
bv.intersect(rb);
|
||||
assert_eq!(
|
||||
bv,
|
||||
Bitmap::from_lsb0_bytes(&[0b10100000], BitmapType::BitVec)
|
||||
);
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
let bv = Bitmap::full_bitvec(8);
|
||||
rb.intersect(bv);
|
||||
assert!(rb.is_empty());
|
||||
|
||||
let mut bv = Bitmap::full_bitvec(8);
|
||||
let rb = Bitmap::new_roaring();
|
||||
bv.intersect(rb);
|
||||
assert!(bv.is_empty());
|
||||
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
let rb = Bitmap::full_roaring(8);
|
||||
bv.intersect(rb);
|
||||
assert!(bv.is_empty());
|
||||
|
||||
let mut rb = Bitmap::full_roaring(8);
|
||||
let bv = Bitmap::new_bitvec();
|
||||
rb.intersect(bv);
|
||||
assert!(rb.is_empty());
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
let bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
rb.intersect(bv);
|
||||
assert!(rb.is_empty());
|
||||
|
||||
let mut bv = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::BitVec);
|
||||
let rb = Bitmap::new_roaring();
|
||||
bv.intersect(rb);
|
||||
assert!(bv.is_empty());
|
||||
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
let rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
bv.intersect(rb);
|
||||
assert!(bv.is_empty());
|
||||
|
||||
let mut rb = Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring);
|
||||
let bv = Bitmap::new_bitvec();
|
||||
rb.intersect(bv);
|
||||
assert!(rb.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_insert_range() {
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
bv.insert_range(0..=5);
|
||||
assert_eq!(bv.iter_ones().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
rb.insert_range(0..=5);
|
||||
assert_eq!(bv.iter_ones().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
|
||||
|
||||
let mut bv = Bitmap::new_bitvec();
|
||||
bv.insert_range(10..=10);
|
||||
assert_eq!(bv.iter_ones().collect::<Vec<_>>(), vec![10]);
|
||||
|
||||
let mut rb = Bitmap::new_roaring();
|
||||
rb.insert_range(10..=10);
|
||||
assert_eq!(bv.iter_ones().collect::<Vec<_>>(), vec![10]);
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ pub mod sort_create;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::bitmap::BitmapType;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::writer::InvertedIndexWriter;
|
||||
use crate::BytesRef;
|
||||
@@ -53,5 +54,9 @@ pub trait InvertedIndexCreator: Send {
|
||||
|
||||
/// Finalizes the index creation process, ensuring all data is properly indexed and stored
|
||||
/// in the provided writer
|
||||
async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()>;
|
||||
async fn finish(
|
||||
&mut self,
|
||||
writer: &mut dyn InvertedIndexWriter,
|
||||
bitmap_type: BitmapType,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
||||
@@ -17,22 +17,23 @@ mod intermediate_rw;
|
||||
mod merge_stream;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use futures::Stream;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::writer::ValueStream;
|
||||
use crate::{Bytes, BytesRef};
|
||||
|
||||
/// A stream of sorted values along with their associated bitmap
|
||||
pub type SortedStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;
|
||||
pub type SortedStream = Box<dyn Stream<Item = Result<(Bytes, Bitmap)>> + Send + Unpin>;
|
||||
|
||||
/// Output of a sorting operation, encapsulating a bitmap for null values and a stream of sorted items
|
||||
pub struct SortOutput {
|
||||
/// Bitmap indicating which segments have null values
|
||||
pub segment_null_bitmap: BitVec,
|
||||
pub segment_null_bitmap: Bitmap,
|
||||
|
||||
/// Stream of sorted items
|
||||
pub sorted_stream: SortedStream,
|
||||
pub sorted_stream: ValueStream,
|
||||
|
||||
/// Total number of rows in the sorted data
|
||||
pub total_row_count: usize,
|
||||
|
||||
@@ -20,11 +20,11 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use common_telemetry::{debug, error};
|
||||
use futures::stream;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::external_provider::ExternalTempFileProvider;
|
||||
use crate::inverted_index::create::sort::intermediate_rw::{
|
||||
IntermediateReader, IntermediateWriter,
|
||||
@@ -45,18 +45,10 @@ pub struct ExternalSorter {
|
||||
temp_file_provider: Arc<dyn ExternalTempFileProvider>,
|
||||
|
||||
/// Bitmap indicating which segments have null values
|
||||
segment_null_bitmap: BitVec,
|
||||
segment_null_bitmap: Bitmap,
|
||||
|
||||
/// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded
|
||||
values_buffer: BTreeMap<Bytes, BitVec>,
|
||||
|
||||
/// Count of rows in the last dumped buffer, used to streamline memory usage of `values_buffer`.
|
||||
///
|
||||
/// After data is dumped to external files, `last_dump_row_count` is updated to reflect the new starting point
|
||||
/// for `BitVec` indexing. This means each `BitVec` in `values_buffer` thereafter encodes positions relative to
|
||||
/// this count, not from 0. This mechanism effectively shrinks the memory footprint of each `BitVec`, helping manage
|
||||
/// memory use more efficiently by focusing only on newly ingested data post-dump.
|
||||
last_dump_row_count: usize,
|
||||
values_buffer: BTreeMap<Bytes, (Bitmap, usize)>,
|
||||
|
||||
/// Count of all rows ingested so far
|
||||
total_row_count: usize,
|
||||
@@ -93,14 +85,14 @@ impl Sorter for ExternalSorter {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let segment_index_range = self.segment_index_range(n, value.is_none());
|
||||
let segment_index_range = self.segment_index_range(n);
|
||||
self.total_row_count += n;
|
||||
|
||||
if let Some(value) = value {
|
||||
let memory_diff = self.push_not_null(value, segment_index_range);
|
||||
self.may_dump_buffer(memory_diff).await
|
||||
} else {
|
||||
set_bits(&mut self.segment_null_bitmap, segment_index_range);
|
||||
self.segment_null_bitmap.insert_range(segment_index_range);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -117,15 +109,10 @@ impl Sorter for ExternalSorter {
|
||||
// TODO(zhongzc): k-way merge instead of 2-way merge
|
||||
|
||||
let mut tree_nodes: VecDeque<SortedStream> = VecDeque::with_capacity(readers.len() + 1);
|
||||
let leading_zeros = self.last_dump_row_count / self.segment_row_count;
|
||||
tree_nodes.push_back(Box::new(stream::iter(
|
||||
mem::take(&mut self.values_buffer)
|
||||
.into_iter()
|
||||
.map(move |(value, mut bitmap)| {
|
||||
bitmap.resize(bitmap.len() + leading_zeros, false);
|
||||
bitmap.shift_right(leading_zeros);
|
||||
Ok((value, bitmap))
|
||||
}),
|
||||
.map(|(value, (bitmap, _))| Ok((value, bitmap))),
|
||||
)));
|
||||
for (_, reader) in readers {
|
||||
tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
|
||||
@@ -161,11 +148,10 @@ impl ExternalSorter {
|
||||
index_name,
|
||||
temp_file_provider,
|
||||
|
||||
segment_null_bitmap: BitVec::new(),
|
||||
segment_null_bitmap: Bitmap::new_bitvec(), // bitvec is more efficient for many null values
|
||||
values_buffer: BTreeMap::new(),
|
||||
|
||||
total_row_count: 0,
|
||||
last_dump_row_count: 0,
|
||||
segment_row_count,
|
||||
|
||||
current_memory_usage: 0,
|
||||
@@ -195,7 +181,7 @@ impl ExternalSorter {
|
||||
}
|
||||
|
||||
/// Pushes the non-null values to the values buffer and sets the bits within
|
||||
/// the specified range in the given BitVec to true.
|
||||
/// the specified range in the given bitmap to true.
|
||||
/// Returns the memory usage difference of the buffer after the operation.
|
||||
fn push_not_null(
|
||||
&mut self,
|
||||
@@ -203,20 +189,23 @@ impl ExternalSorter {
|
||||
segment_index_range: RangeInclusive<usize>,
|
||||
) -> usize {
|
||||
match self.values_buffer.get_mut(value) {
|
||||
Some(bitmap) => {
|
||||
let old_len = bitmap.as_raw_slice().len();
|
||||
set_bits(bitmap, segment_index_range);
|
||||
Some((bitmap, mem_usage)) => {
|
||||
bitmap.insert_range(segment_index_range);
|
||||
let new_usage = bitmap.memory_usage() + value.len();
|
||||
let diff = new_usage - *mem_usage;
|
||||
*mem_usage = new_usage;
|
||||
|
||||
bitmap.as_raw_slice().len() - old_len
|
||||
diff
|
||||
}
|
||||
None => {
|
||||
let mut bitmap = BitVec::default();
|
||||
set_bits(&mut bitmap, segment_index_range);
|
||||
let mut bitmap = Bitmap::new_roaring();
|
||||
bitmap.insert_range(segment_index_range);
|
||||
|
||||
let mem_diff = bitmap.as_raw_slice().len() + value.len();
|
||||
self.values_buffer.insert(value.to_vec(), bitmap);
|
||||
let mem_usage = bitmap.memory_usage() + value.len();
|
||||
self.values_buffer
|
||||
.insert(value.to_vec(), (bitmap, mem_usage));
|
||||
|
||||
mem_diff
|
||||
mem_usage
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -257,12 +246,8 @@ impl ExternalSorter {
|
||||
.fetch_sub(memory_usage, Ordering::Relaxed);
|
||||
self.current_memory_usage = 0;
|
||||
|
||||
let bitmap_leading_zeros = self.last_dump_row_count / self.segment_row_count;
|
||||
self.last_dump_row_count =
|
||||
self.total_row_count - self.total_row_count % self.segment_row_count; // align to segment
|
||||
|
||||
let entries = values.len();
|
||||
IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_|
|
||||
IntermediateWriter::new(writer).write_all(values.into_iter().map(|(k, (b, _))| (k, b))).await.inspect(|_|
|
||||
debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
|
||||
).inspect_err(|e|
|
||||
error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}")
|
||||
@@ -271,13 +256,8 @@ impl ExternalSorter {
|
||||
|
||||
/// Determines the segment index range for the row index range
|
||||
/// `[row_begin, row_begin + n - 1]`
|
||||
fn segment_index_range(&self, n: usize, is_null: bool) -> RangeInclusive<usize> {
|
||||
let row_begin = if is_null {
|
||||
self.total_row_count
|
||||
} else {
|
||||
self.total_row_count - self.last_dump_row_count
|
||||
};
|
||||
|
||||
fn segment_index_range(&self, n: usize) -> RangeInclusive<usize> {
|
||||
let row_begin = self.total_row_count;
|
||||
let start = self.segment_index(row_begin);
|
||||
let end = self.segment_index(row_begin + n - 1);
|
||||
start..=end
|
||||
@@ -289,16 +269,6 @@ impl ExternalSorter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the bits within the specified range in the given `BitVec` to true
|
||||
fn set_bits(bitmap: &mut BitVec, index_range: RangeInclusive<usize>) {
|
||||
if *index_range.end() >= bitmap.len() {
|
||||
bitmap.resize(index_range.end() + 1, false);
|
||||
}
|
||||
for index in index_range {
|
||||
bitmap.set(index, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
@@ -330,7 +300,7 @@ mod tests {
|
||||
move |index_name, file_id| {
|
||||
assert_eq!(index_name, "test");
|
||||
let mut files = files.lock().unwrap();
|
||||
let (writer, reader) = duplex(8 * 1024);
|
||||
let (writer, reader) = duplex(1024 * 1024);
|
||||
files.insert(file_id.to_string(), Box::new(reader.compat()));
|
||||
Ok(Box::new(writer.compat_write()))
|
||||
}
|
||||
|
||||
@@ -19,29 +19,24 @@
|
||||
//! The serialization format is as follows:
|
||||
//!
|
||||
//! ```text
|
||||
//! [magic][bitmap leading zeros][item][item]...[item]
|
||||
//! [4] [4] [?]
|
||||
//! [magic][item][item]...[item]
|
||||
//! [4] [?]
|
||||
//!
|
||||
//! Each [item] is structured as:
|
||||
//! [value len][value][bitmap len][bitmap]
|
||||
//! [8] [?] [8] [?]
|
||||
//! ```
|
||||
//!
|
||||
//! The format starts with a 4-byte magic identifier, followed by a 4-byte
|
||||
//! bitmap leading zeros count, indicating how many leading zeros are in the
|
||||
//! fixed-size region of the bitmap. Following that, each item represents
|
||||
//! a value and its associated bitmap, serialized with their lengths for
|
||||
//! Each item represents a value and its associated bitmap, serialized with their lengths for
|
||||
//! easier deserialization.
|
||||
|
||||
mod codec_v1;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use asynchronous_codec::{FramedRead, FramedWrite};
|
||||
use common_base::BitVec;
|
||||
use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bitmap::{Bitmap, BitmapType};
|
||||
use crate::inverted_index::create::sort::SortedStream;
|
||||
use crate::inverted_index::error::{
|
||||
CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu,
|
||||
@@ -62,12 +57,13 @@ impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
|
||||
/// Serializes and writes all provided values to the wrapped writer
|
||||
pub async fn write_all(
|
||||
mut self,
|
||||
values: BTreeMap<Bytes, BitVec>,
|
||||
bitmap_leading_zeros: u32,
|
||||
values: impl IntoIterator<Item = (Bytes, Bitmap)>,
|
||||
) -> Result<()> {
|
||||
let (codec_magic, encoder) = (
|
||||
codec_v1::CODEC_V1_MAGIC,
|
||||
codec_v1::IntermediateItemEncoderV1,
|
||||
codec_v1::IntermediateItemEncoderV1 {
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
},
|
||||
);
|
||||
|
||||
self.writer
|
||||
@@ -75,11 +71,6 @@ impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
|
||||
.await
|
||||
.context(WriteSnafu)?;
|
||||
|
||||
self.writer
|
||||
.write_all(&bitmap_leading_zeros.to_be_bytes())
|
||||
.await
|
||||
.context(WriteSnafu)?;
|
||||
|
||||
let value_stream = stream::iter(values.into_iter().map(Ok));
|
||||
let frame_write = FramedWrite::new(&mut self.writer, encoder);
|
||||
// `forward()` will flush and close the writer when the stream ends
|
||||
@@ -112,17 +103,9 @@ impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
|
||||
.context(ReadSnafu)?;
|
||||
|
||||
let decoder = match &magic {
|
||||
codec_v1::CODEC_V1_MAGIC => {
|
||||
let bitmap_leading_zeros = {
|
||||
let mut buf = [0u8; 4];
|
||||
self.reader.read_exact(&mut buf).await.context(ReadSnafu)?;
|
||||
u32::from_be_bytes(buf)
|
||||
};
|
||||
|
||||
codec_v1::IntermediateItemDecoderV1 {
|
||||
bitmap_leading_zeros,
|
||||
}
|
||||
}
|
||||
codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateItemDecoderV1 {
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
},
|
||||
_ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(),
|
||||
};
|
||||
|
||||
@@ -132,6 +115,7 @@ impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
|
||||
use futures::io::{AllowStdIo, Cursor};
|
||||
@@ -140,6 +124,10 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
|
||||
fn bitmap(bytes: &[u8]) -> Bitmap {
|
||||
Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_intermediate_read_write_basic() {
|
||||
let file_r = tempfile().unwrap();
|
||||
@@ -148,12 +136,12 @@ mod tests {
|
||||
let buf_w = AllowStdIo::new(file_w);
|
||||
|
||||
let values = BTreeMap::from_iter([
|
||||
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
|
||||
(Bytes::from("a"), bitmap(&[0b10101010])),
|
||||
(Bytes::from("b"), bitmap(&[0b01010101])),
|
||||
]);
|
||||
|
||||
let writer = IntermediateWriter::new(buf_w);
|
||||
writer.write_all(values.clone(), 0).await.unwrap();
|
||||
writer.write_all(values.clone()).await.unwrap();
|
||||
// reset the handle
|
||||
buf_r.seek(SeekFrom::Start(0)).unwrap();
|
||||
|
||||
@@ -161,48 +149,9 @@ mod tests {
|
||||
let mut stream = reader.into_stream().await.unwrap();
|
||||
|
||||
let a = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(a, (Bytes::from("a"), BitVec::from_slice(&[0b10101010])));
|
||||
assert_eq!(a, (Bytes::from("a"), bitmap(&[0b10101010])));
|
||||
let b = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(b, (Bytes::from("b"), BitVec::from_slice(&[0b01010101])));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_intermediate_read_write_with_prefix_zeros() {
|
||||
let file_r = tempfile().unwrap();
|
||||
let file_w = file_r.try_clone().unwrap();
|
||||
let mut buf_r = AllowStdIo::new(file_r);
|
||||
let buf_w = AllowStdIo::new(file_w);
|
||||
|
||||
let values = BTreeMap::from_iter([
|
||||
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
|
||||
]);
|
||||
|
||||
let writer = IntermediateWriter::new(buf_w);
|
||||
writer.write_all(values.clone(), 8).await.unwrap();
|
||||
// reset the handle
|
||||
buf_r.seek(SeekFrom::Start(0)).unwrap();
|
||||
|
||||
let reader = IntermediateReader::new(buf_r);
|
||||
let mut stream = reader.into_stream().await.unwrap();
|
||||
|
||||
let a = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
a,
|
||||
(
|
||||
Bytes::from("a"),
|
||||
BitVec::from_slice(&[0b00000000, 0b10101010])
|
||||
)
|
||||
);
|
||||
let b = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
b,
|
||||
(
|
||||
Bytes::from("b"),
|
||||
BitVec::from_slice(&[0b00000000, 0b01010101])
|
||||
)
|
||||
);
|
||||
assert_eq!(b, (Bytes::from("b"), bitmap(&[0b01010101])));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
@@ -213,7 +162,7 @@ mod tests {
|
||||
let values = BTreeMap::new();
|
||||
|
||||
let writer = IntermediateWriter::new(&mut buf);
|
||||
writer.write_all(values.clone(), 0).await.unwrap();
|
||||
writer.write_all(values.clone()).await.unwrap();
|
||||
|
||||
let reader = IntermediateReader::new(Cursor::new(buf));
|
||||
let mut stream = reader.into_stream().await.unwrap();
|
||||
|
||||
@@ -16,9 +16,10 @@ use std::io;
|
||||
|
||||
use asynchronous_codec::{BytesMut, Decoder, Encoder};
|
||||
use bytes::{Buf, BufMut};
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::BitmapType;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::{CommonIoSnafu, Error, Result};
|
||||
use crate::Bytes;
|
||||
|
||||
@@ -28,37 +29,42 @@ const U64_LENGTH: usize = std::mem::size_of::<u64>();
|
||||
pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01";
|
||||
|
||||
/// Serializes items of external sorting intermediate files.
|
||||
pub struct IntermediateItemEncoderV1;
|
||||
pub struct IntermediateItemEncoderV1 {
|
||||
pub bitmap_type: BitmapType,
|
||||
}
|
||||
|
||||
/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented.
|
||||
impl Encoder for IntermediateItemEncoderV1 {
|
||||
type Item<'a> = (Bytes, BitVec);
|
||||
type Item<'a> = (Bytes, Bitmap);
|
||||
type Error = Error;
|
||||
|
||||
fn encode(&mut self, item: (Bytes, BitVec), dst: &mut BytesMut) -> Result<()> {
|
||||
fn encode(&mut self, item: (Bytes, Bitmap), dst: &mut BytesMut) -> Result<()> {
|
||||
let value_bytes = item.0;
|
||||
let bitmap_bytes = item.1.into_vec();
|
||||
let bitmap_size = item.1.serialized_size(self.bitmap_type);
|
||||
|
||||
dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_bytes.len());
|
||||
dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_size);
|
||||
dst.put_u64_le(value_bytes.len() as u64);
|
||||
dst.extend_from_slice(&value_bytes);
|
||||
dst.put_u64_le(bitmap_bytes.len() as u64);
|
||||
dst.extend_from_slice(&bitmap_bytes);
|
||||
dst.put_u64_le(bitmap_size as u64);
|
||||
item.1
|
||||
.serialize_into(self.bitmap_type, &mut dst.writer())
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserializes items of external sorting intermediate files.
|
||||
pub struct IntermediateItemDecoderV1 {
|
||||
pub(crate) bitmap_leading_zeros: u32,
|
||||
pub bitmap_type: BitmapType,
|
||||
}
|
||||
|
||||
/// [`FramedRead`] requires the [`Decoder`] trait to be implemented.
|
||||
impl Decoder for IntermediateItemDecoderV1 {
|
||||
type Item = (Bytes, BitVec);
|
||||
type Item = (Bytes, Bitmap);
|
||||
type Error = Error;
|
||||
|
||||
/// Decodes the `src` into `(Bytes, BitVec)`. Returns `None` if
|
||||
/// Decodes the `src` into `(Bytes, RoaringBitmap)`. Returns `None` if
|
||||
/// the `src` does not contain enough data for a complete item.
|
||||
///
|
||||
/// Only after successful decoding, the `src` is advanced. Otherwise,
|
||||
@@ -92,8 +98,8 @@ impl Decoder for IntermediateItemDecoderV1 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut bitmap = BitVec::repeat(false, self.bitmap_leading_zeros as _);
|
||||
bitmap.extend_from_raw_slice(&buf[..bitmap_len]);
|
||||
let bitmap = Bitmap::deserialize_from(&buf[..bitmap_len], self.bitmap_type)
|
||||
.context(CommonIoSnafu)?;
|
||||
|
||||
let item = (value_bytes.to_vec(), bitmap);
|
||||
|
||||
@@ -113,25 +119,29 @@ impl From<io::Error> for Error {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::bit_vec::prelude::{bitvec, Lsb0};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn bitmap(bytes: &[u8]) -> Bitmap {
|
||||
Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intermediate_codec_basic() {
|
||||
let mut encoder = IntermediateItemEncoderV1;
|
||||
let mut encoder = IntermediateItemEncoderV1 {
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
|
||||
let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
|
||||
encoder.encode(item.clone(), &mut buf).unwrap();
|
||||
|
||||
let mut decoder = IntermediateItemDecoderV1 {
|
||||
bitmap_leading_zeros: 0,
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
|
||||
|
||||
let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101]));
|
||||
let item1 = (b"world".to_vec(), bitmap(&[0b01010101]));
|
||||
encoder.encode(item.clone(), &mut buf).unwrap();
|
||||
encoder.encode(item1.clone(), &mut buf).unwrap();
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
|
||||
@@ -142,14 +152,16 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_intermediate_codec_empty_item() {
|
||||
let mut encoder = IntermediateItemEncoderV1;
|
||||
let mut encoder = IntermediateItemEncoderV1 {
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let item = (b"".to_vec(), BitVec::from_slice(&[]));
|
||||
let item = (b"".to_vec(), bitmap(&[]));
|
||||
encoder.encode(item.clone(), &mut buf).unwrap();
|
||||
|
||||
let mut decoder = IntermediateItemDecoderV1 {
|
||||
bitmap_leading_zeros: 0,
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
|
||||
@@ -158,17 +170,19 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_intermediate_codec_partial() {
|
||||
let mut encoder = IntermediateItemEncoderV1;
|
||||
let mut encoder = IntermediateItemEncoderV1 {
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
|
||||
let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
|
||||
encoder.encode(item.clone(), &mut buf).unwrap();
|
||||
|
||||
let partial_length = U64_LENGTH + 3;
|
||||
let mut partial_bytes = buf.split_to(partial_length);
|
||||
|
||||
let mut decoder = IntermediateItemDecoderV1 {
|
||||
bitmap_leading_zeros: 0,
|
||||
bitmap_type: BitmapType::Roaring,
|
||||
};
|
||||
assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); // not enough data
|
||||
partial_bytes.extend_from_slice(&buf[..]);
|
||||
@@ -176,25 +190,4 @@ mod tests {
|
||||
assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None);
|
||||
assert!(partial_bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_intermediate_codec_prefix_zeros() {
|
||||
let mut encoder = IntermediateItemEncoderV1;
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let item = (b"hello".to_vec(), bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]);
|
||||
encoder.encode(item.clone(), &mut buf).unwrap();
|
||||
|
||||
let mut decoder = IntermediateItemDecoderV1 {
|
||||
bitmap_leading_zeros: 3,
|
||||
};
|
||||
let decoded_item = decoder.decode(&mut buf).unwrap().unwrap();
|
||||
assert_eq!(decoded_item.0, b"hello");
|
||||
assert_eq!(
|
||||
decoded_item.1,
|
||||
bitvec![u8, Lsb0; 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0]
|
||||
);
|
||||
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
|
||||
assert!(buf.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,10 +16,10 @@ use std::cmp::Ordering;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_base::BitVec;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::create::sort::SortedStream;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::Bytes;
|
||||
@@ -28,10 +28,10 @@ use crate::Bytes;
|
||||
#[pin_project]
|
||||
pub struct MergeSortedStream {
|
||||
stream1: Option<SortedStream>,
|
||||
peek1: Option<(Bytes, BitVec)>,
|
||||
peek1: Option<(Bytes, Bitmap)>,
|
||||
|
||||
stream2: Option<SortedStream>,
|
||||
peek2: Option<(Bytes, BitVec)>,
|
||||
peek2: Option<(Bytes, Bitmap)>,
|
||||
}
|
||||
|
||||
impl MergeSortedStream {
|
||||
@@ -49,7 +49,7 @@ impl MergeSortedStream {
|
||||
}
|
||||
|
||||
impl Stream for MergeSortedStream {
|
||||
type Item = Result<(Bytes, BitVec)>;
|
||||
type Item = Result<(Bytes, Bitmap)>;
|
||||
|
||||
/// Polls both streams and returns the next item from the stream that has the smaller next item.
|
||||
/// If both streams have the same next item, the bitmaps are unioned together.
|
||||
@@ -89,77 +89,77 @@ impl Stream for MergeSortedStream {
|
||||
}
|
||||
|
||||
/// Merges two bitmaps by bit-wise OR'ing them together, preserving all bits from both
|
||||
fn merge_bitmaps(bitmap1: BitVec, bitmap2: BitVec) -> BitVec {
|
||||
// make sure longer bitmap is on the left to avoid truncation
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
if bitmap1.len() > bitmap2.len() {
|
||||
bitmap1 | bitmap2
|
||||
} else {
|
||||
bitmap2 | bitmap1
|
||||
}
|
||||
fn merge_bitmaps(mut bitmap1: Bitmap, bitmap2: Bitmap) -> Bitmap {
|
||||
bitmap1.union(bitmap2);
|
||||
bitmap1
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::stream;
|
||||
use greptime_proto::v1::index::BitmapType;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
|
||||
fn sorted_stream_from_vec(vec: Vec<(Bytes, BitVec)>) -> SortedStream {
|
||||
fn bitmap(bytes: &[u8]) -> Bitmap {
|
||||
Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
|
||||
}
|
||||
|
||||
fn sorted_stream_from_vec(vec: Vec<(Bytes, Bitmap)>) -> SortedStream {
|
||||
Box::new(stream::iter(vec.into_iter().map(Ok::<_, Error>)))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_sorted_stream_non_overlapping() {
|
||||
let stream1 = sorted_stream_from_vec(vec![
|
||||
(Bytes::from("apple"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("orange"), BitVec::from_slice(&[0b01010101])),
|
||||
(Bytes::from("apple"), bitmap(&[0b10101010])),
|
||||
(Bytes::from("orange"), bitmap(&[0b01010101])),
|
||||
]);
|
||||
let stream2 = sorted_stream_from_vec(vec![
|
||||
(Bytes::from("banana"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("peach"), BitVec::from_slice(&[0b01010101])),
|
||||
(Bytes::from("banana"), bitmap(&[0b10101010])),
|
||||
(Bytes::from("peach"), bitmap(&[0b01010101])),
|
||||
]);
|
||||
|
||||
let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
|
||||
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("apple"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b10101010]));
|
||||
assert_eq!(item.1, bitmap(&[0b10101010]));
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("banana"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b10101010]));
|
||||
assert_eq!(item.1, bitmap(&[0b10101010]));
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("orange"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b01010101]));
|
||||
assert_eq!(item.1, bitmap(&[0b01010101]));
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("peach"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b01010101]));
|
||||
assert_eq!(item.1, bitmap(&[0b01010101]));
|
||||
assert!(merged_stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_sorted_stream_overlapping() {
|
||||
let stream1 = sorted_stream_from_vec(vec![
|
||||
(Bytes::from("apple"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("orange"), BitVec::from_slice(&[0b10101010])),
|
||||
(Bytes::from("apple"), bitmap(&[0b10101010])),
|
||||
(Bytes::from("orange"), bitmap(&[0b10101010])),
|
||||
]);
|
||||
let stream2 = sorted_stream_from_vec(vec![
|
||||
(Bytes::from("apple"), BitVec::from_slice(&[0b01010101])),
|
||||
(Bytes::from("peach"), BitVec::from_slice(&[0b01010101])),
|
||||
(Bytes::from("apple"), bitmap(&[0b01010101])),
|
||||
(Bytes::from("peach"), bitmap(&[0b01010101])),
|
||||
]);
|
||||
|
||||
let mut merged_stream = MergeSortedStream::merge(stream1, stream2);
|
||||
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("apple"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b11111111]));
|
||||
assert_eq!(item.1, bitmap(&[0b11111111]));
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("orange"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b10101010]));
|
||||
assert_eq!(item.1, bitmap(&[0b10101010]));
|
||||
let item = merged_stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(item.0, Bytes::from("peach"));
|
||||
assert_eq!(item.1, BitVec::from_slice(&[0b01010101]));
|
||||
assert_eq!(item.1, bitmap(&[0b01010101]));
|
||||
assert!(merged_stream.next().await.is_none());
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::num::NonZeroUsize;
|
||||
use async_trait::async_trait;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::bitmap::BitmapType;
|
||||
use crate::inverted_index::create::sort::{SortOutput, Sorter};
|
||||
use crate::inverted_index::create::InvertedIndexCreator;
|
||||
use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
|
||||
@@ -68,7 +69,11 @@ impl InvertedIndexCreator for SortIndexCreator {
|
||||
}
|
||||
|
||||
/// Finalizes the sorting for all indexes and writes them using the inverted index writer
|
||||
async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()> {
|
||||
async fn finish(
|
||||
&mut self,
|
||||
writer: &mut dyn InvertedIndexWriter,
|
||||
bitmap_type: BitmapType,
|
||||
) -> Result<()> {
|
||||
let mut output_row_count = None;
|
||||
for (index_name, mut sorter) in self.sorters.drain() {
|
||||
let SortOutput {
|
||||
@@ -88,7 +93,7 @@ impl InvertedIndexCreator for SortIndexCreator {
|
||||
);
|
||||
|
||||
writer
|
||||
.add_index(index_name, segment_null_bitmap, sorted_stream)
|
||||
.add_index(index_name, segment_null_bitmap, sorted_stream, bitmap_type)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -117,9 +122,9 @@ mod tests {
|
||||
use futures::{stream, StreamExt};
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::create::sort::SortedStream;
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Error;
|
||||
use crate::inverted_index::format::writer::MockInvertedIndexWriter;
|
||||
use crate::inverted_index::format::writer::{MockInvertedIndexWriter, ValueStream};
|
||||
use crate::Bytes;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -143,11 +148,10 @@ mod tests {
|
||||
}
|
||||
|
||||
let mut mock_writer = MockInvertedIndexWriter::new();
|
||||
mock_writer
|
||||
.expect_add_index()
|
||||
.times(3)
|
||||
.returning(|name, null_bitmap, stream| {
|
||||
mock_writer.expect_add_index().times(3).returning(
|
||||
|name, null_bitmap, stream, bitmap_type| {
|
||||
assert!(null_bitmap.is_empty());
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
match name.as_str() {
|
||||
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
|
||||
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
|
||||
@@ -155,7 +159,8 @@ mod tests {
|
||||
_ => panic!("unexpected index name: {}", name),
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
},
|
||||
);
|
||||
mock_writer
|
||||
.expect_finish()
|
||||
.times(1)
|
||||
@@ -165,7 +170,10 @@ mod tests {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
creator.finish(&mut mock_writer).await.unwrap();
|
||||
creator
|
||||
.finish(&mut mock_writer, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -191,8 +199,9 @@ mod tests {
|
||||
let mut mock_writer = MockInvertedIndexWriter::new();
|
||||
mock_writer
|
||||
.expect_add_index()
|
||||
.returning(|name, null_bitmap, stream| {
|
||||
.returning(|name, null_bitmap, stream, bitmap_type| {
|
||||
assert!(null_bitmap.is_empty());
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
match name.as_str() {
|
||||
"a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]),
|
||||
"b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]),
|
||||
@@ -203,7 +212,7 @@ mod tests {
|
||||
});
|
||||
mock_writer.expect_finish().never();
|
||||
|
||||
let res = creator.finish(&mut mock_writer).await;
|
||||
let res = creator.finish(&mut mock_writer, BitmapType::Roaring).await;
|
||||
assert!(matches!(res, Err(Error::InconsistentRowCount { .. })));
|
||||
}
|
||||
|
||||
@@ -219,8 +228,9 @@ mod tests {
|
||||
let mut mock_writer = MockInvertedIndexWriter::new();
|
||||
mock_writer
|
||||
.expect_add_index()
|
||||
.returning(|name, null_bitmap, stream| {
|
||||
.returning(|name, null_bitmap, stream, bitmap_type| {
|
||||
assert!(null_bitmap.is_empty());
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
assert!(matches!(name.as_str(), "a" | "b" | "c"));
|
||||
assert!(stream_to_values(stream).is_empty());
|
||||
Ok(())
|
||||
@@ -234,7 +244,10 @@ mod tests {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
creator.finish(&mut mock_writer).await.unwrap();
|
||||
creator
|
||||
.finish(&mut mock_writer, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn set_bit(bit_vec: &mut BitVec, index: usize) {
|
||||
@@ -283,20 +296,21 @@ mod tests {
|
||||
|
||||
async fn output(&mut self) -> Result<SortOutput> {
|
||||
let segment_null_bitmap = self.values.remove(&None).unwrap_or_default();
|
||||
let segment_null_bitmap = Bitmap::BitVec(segment_null_bitmap);
|
||||
|
||||
Ok(SortOutput {
|
||||
segment_null_bitmap,
|
||||
sorted_stream: Box::new(stream::iter(
|
||||
std::mem::take(&mut self.values)
|
||||
.into_iter()
|
||||
.map(|(v, b)| Ok((v.unwrap(), b))),
|
||||
.map(|(v, b)| Ok((v.unwrap(), Bitmap::BitVec(b)))),
|
||||
)),
|
||||
total_row_count: self.total_row_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_to_values(stream: SortedStream) -> Vec<Bytes> {
|
||||
fn stream_to_values(stream: ValueStream) -> Vec<Bytes> {
|
||||
futures::executor::block_on(async {
|
||||
stream.map(|r| r.unwrap().0).collect::<Vec<Bytes>>().await
|
||||
})
|
||||
|
||||
@@ -110,6 +110,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode bitmap"))]
|
||||
DecodeBitmap {
|
||||
#[snafu(source)]
|
||||
error: IoError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode protobuf"))]
|
||||
DecodeProto {
|
||||
#[snafu(source)]
|
||||
@@ -240,6 +248,7 @@ impl ErrorExt for Error {
|
||||
| CommonIo { .. }
|
||||
| UnknownIntermediateCodecMagic { .. }
|
||||
| FstCompile { .. }
|
||||
| DecodeBitmap { .. }
|
||||
| InvalidFooterPayloadSize { .. }
|
||||
| BlobSizeTooSmall { .. } => StatusCode::Unexpected,
|
||||
|
||||
|
||||
@@ -18,11 +18,11 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::inverted_index::error::{DecodeFstSnafu, Result};
|
||||
use crate::bitmap::{Bitmap, BitmapType};
|
||||
use crate::inverted_index::error::{DecodeBitmapSnafu, DecodeFstSnafu, Result};
|
||||
pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
|
||||
use crate::inverted_index::FstMap;
|
||||
|
||||
@@ -67,17 +67,25 @@ pub trait InvertedIndexReader: Send + Sync {
|
||||
}
|
||||
|
||||
/// Retrieves the bitmap from the given offset and size.
|
||||
async fn bitmap(&self, offset: u64, size: u32) -> Result<BitVec> {
|
||||
self.range_read(offset, size).await.map(BitVec::from_vec)
|
||||
async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result<Bitmap> {
|
||||
self.range_read(offset, size).await.and_then(|bytes| {
|
||||
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieves the multiple bitmaps from the given ranges.
|
||||
async fn bitmap_deque(&mut self, ranges: &[Range<u64>]) -> Result<VecDeque<BitVec>> {
|
||||
Ok(self
|
||||
.read_vec(ranges)
|
||||
.await?
|
||||
async fn bitmap_deque(
|
||||
&mut self,
|
||||
ranges: &[(Range<u64>, BitmapType)],
|
||||
) -> Result<VecDeque<Bitmap>> {
|
||||
let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
|
||||
let bytes = self.read_vec(&ranges).await?;
|
||||
bytes
|
||||
.into_iter()
|
||||
.map(|bytes| BitVec::from_slice(bytes.as_ref()))
|
||||
.collect::<VecDeque<_>>())
|
||||
.zip(types)
|
||||
.map(|(bytes, bitmap_type)| {
|
||||
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
|
||||
})
|
||||
.collect::<Result<VecDeque<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,14 +78,14 @@ impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::bit_vec::prelude::*;
|
||||
use fst::MapBuilder;
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
|
||||
use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
|
||||
use prost::Message;
|
||||
|
||||
use super::*;
|
||||
use crate::bitmap::Bitmap;
|
||||
|
||||
fn create_fake_fst() -> Vec<u8> {
|
||||
fn mock_fst() -> Vec<u8> {
|
||||
let mut fst_buf = Vec::new();
|
||||
let mut build = MapBuilder::new(&mut fst_buf).unwrap();
|
||||
build.insert("key1".as_bytes(), 1).unwrap();
|
||||
@@ -94,19 +94,27 @@ mod tests {
|
||||
fst_buf
|
||||
}
|
||||
|
||||
fn create_fake_bitmap() -> Vec<u8> {
|
||||
bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0, 1, 0].into_vec()
|
||||
fn mock_bitmap() -> Bitmap {
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
|
||||
}
|
||||
|
||||
fn mock_bitmap_bytes() -> Vec<u8> {
|
||||
let mut buf = Vec::new();
|
||||
mock_bitmap()
|
||||
.serialize_into(BitmapType::Roaring, &mut buf)
|
||||
.unwrap();
|
||||
buf
|
||||
}
|
||||
|
||||
fn create_inverted_index_blob() -> Vec<u8> {
|
||||
let bitmap_size = create_fake_bitmap().len();
|
||||
let fst_size = create_fake_fst().len();
|
||||
let bitmap_size = mock_bitmap_bytes().len();
|
||||
let fst_size = mock_fst().len();
|
||||
|
||||
// first index
|
||||
let mut inverted_index = Vec::new();
|
||||
inverted_index.extend_from_slice(&create_fake_bitmap()); // value bitmap
|
||||
inverted_index.extend_from_slice(&create_fake_bitmap()); // null bitmap
|
||||
inverted_index.extend_from_slice(&create_fake_fst()); // fst
|
||||
inverted_index.extend_from_slice(&mock_bitmap_bytes()); // value bitmap
|
||||
inverted_index.extend_from_slice(&mock_bitmap_bytes()); // null bitmap
|
||||
inverted_index.extend_from_slice(&mock_fst()); // fst
|
||||
|
||||
let meta = InvertedIndexMeta {
|
||||
name: "tag0".to_string(),
|
||||
@@ -116,6 +124,7 @@ mod tests {
|
||||
null_bitmap_size: bitmap_size as _,
|
||||
relative_fst_offset: (bitmap_size * 2) as _,
|
||||
fst_size: fst_size as _,
|
||||
bitmap_type: BitmapType::Roaring as _,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -128,6 +137,7 @@ mod tests {
|
||||
null_bitmap_size: bitmap_size as _,
|
||||
relative_fst_offset: (bitmap_size * 2) as _,
|
||||
fst_size: fst_size as _,
|
||||
bitmap_type: BitmapType::Roaring as _,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -168,19 +178,19 @@ mod tests {
|
||||
let meta0 = metas.metas.get("tag0").unwrap();
|
||||
assert_eq!(meta0.name, "tag0");
|
||||
assert_eq!(meta0.base_offset, 0);
|
||||
assert_eq!(meta0.inverted_index_size, 54);
|
||||
assert_eq!(meta0.relative_null_bitmap_offset, 2);
|
||||
assert_eq!(meta0.null_bitmap_size, 2);
|
||||
assert_eq!(meta0.relative_fst_offset, 4);
|
||||
assert_eq!(meta0.inverted_index_size, 102);
|
||||
assert_eq!(meta0.relative_null_bitmap_offset, 26);
|
||||
assert_eq!(meta0.null_bitmap_size, 26);
|
||||
assert_eq!(meta0.relative_fst_offset, 52);
|
||||
assert_eq!(meta0.fst_size, 50);
|
||||
|
||||
let meta1 = metas.metas.get("tag1").unwrap();
|
||||
assert_eq!(meta1.name, "tag1");
|
||||
assert_eq!(meta1.base_offset, 54);
|
||||
assert_eq!(meta1.inverted_index_size, 54);
|
||||
assert_eq!(meta1.relative_null_bitmap_offset, 2);
|
||||
assert_eq!(meta1.null_bitmap_size, 2);
|
||||
assert_eq!(meta1.relative_fst_offset, 4);
|
||||
assert_eq!(meta1.base_offset, 102);
|
||||
assert_eq!(meta1.inverted_index_size, 102);
|
||||
assert_eq!(meta1.relative_null_bitmap_offset, 26);
|
||||
assert_eq!(meta1.null_bitmap_size, 26);
|
||||
assert_eq!(meta1.relative_fst_offset, 52);
|
||||
assert_eq!(meta1.fst_size, 50);
|
||||
}
|
||||
|
||||
@@ -224,17 +234,29 @@ mod tests {
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let meta = metas.metas.get("tag0").unwrap();
|
||||
|
||||
let bitmap = blob_reader.bitmap(meta.base_offset, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader.bitmap(meta.base_offset + 2, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
|
||||
let metas = blob_reader.metadata().await.unwrap();
|
||||
let meta = metas.metas.get("tag1").unwrap();
|
||||
|
||||
let bitmap = blob_reader.bitmap(meta.base_offset, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader.bitmap(meta.base_offset + 2, 2).await.unwrap();
|
||||
assert_eq!(bitmap.into_vec(), create_fake_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
let bitmap = blob_reader
|
||||
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, mock_bitmap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,14 @@ mod single;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use futures::Stream;
|
||||
|
||||
use crate::bitmap::{Bitmap, BitmapType};
|
||||
use crate::inverted_index::error::Result;
|
||||
pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter;
|
||||
use crate::Bytes;
|
||||
|
||||
pub type ValueStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;
|
||||
pub type ValueStream = Box<dyn Stream<Item = Result<(Bytes, Bitmap)>> + Send + Unpin>;
|
||||
|
||||
/// Trait for writing inverted index data to underlying storage.
|
||||
#[mockall::automock]
|
||||
@@ -37,11 +37,13 @@ pub trait InvertedIndexWriter: Send {
|
||||
/// * `null_bitmap` marks positions of null entries.
|
||||
/// * `values` is a stream of values and their locations, yielded lexicographically.
|
||||
/// Errors occur if the values are out of order.
|
||||
/// * `bitmap_type` is the type of bitmap to encode.
|
||||
async fn add_index(
|
||||
&mut self,
|
||||
name: String,
|
||||
null_bitmap: BitVec,
|
||||
null_bitmap: Bitmap,
|
||||
values: ValueStream,
|
||||
bitmap_type: BitmapType,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Finalizes the index writing process, ensuring all data is written.
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use futures::{AsyncWrite, AsyncWriteExt};
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bitmap::{Bitmap, BitmapType};
|
||||
use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
|
||||
use crate::inverted_index::format::writer::single::SingleIndexWriter;
|
||||
use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
|
||||
@@ -43,8 +43,9 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWrit
|
||||
async fn add_index(
|
||||
&mut self,
|
||||
name: String,
|
||||
null_bitmap: BitVec,
|
||||
null_bitmap: Bitmap,
|
||||
values: ValueStream,
|
||||
bitmap_type: BitmapType,
|
||||
) -> Result<()> {
|
||||
let single_writer = SingleIndexWriter::new(
|
||||
name.clone(),
|
||||
@@ -52,6 +53,7 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWrit
|
||||
null_bitmap,
|
||||
values,
|
||||
&mut self.blob_writer,
|
||||
bitmap_type,
|
||||
);
|
||||
let metadata = single_writer.write().await?;
|
||||
|
||||
@@ -100,6 +102,7 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::stream;
|
||||
use greptime_proto::v1::index::BitmapType;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
|
||||
@@ -132,24 +135,44 @@ mod tests {
|
||||
writer
|
||||
.add_index(
|
||||
"tag0".to_string(),
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
Box::new(stream::iter(vec![
|
||||
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
|
||||
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("a"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("b"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("c"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
])),
|
||||
BitmapType::Roaring,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.add_index(
|
||||
"tag1".to_string(),
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
Box::new(stream::iter(vec![
|
||||
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
|
||||
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("x"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("y"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("z"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
])),
|
||||
BitmapType::Roaring,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -181,22 +204,31 @@ mod tests {
|
||||
assert_eq!(fst0.len(), 3);
|
||||
let [offset, size] = unpack(fst0.get(b"a").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"b").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"c").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// tag1
|
||||
let tag1 = metadata.metas.get("tag1").unwrap();
|
||||
@@ -215,21 +247,30 @@ mod tests {
|
||||
assert_eq!(fst1.len(), 3);
|
||||
let [offset, size] = unpack(fst1.get(b"x").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"y").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"z").unwrap());
|
||||
let bitmap = reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,12 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::BitVec;
|
||||
use fst::MapBuilder;
|
||||
use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt};
|
||||
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bitmap::{Bitmap, BitmapType};
|
||||
use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu};
|
||||
use crate::Bytes;
|
||||
|
||||
@@ -27,7 +27,7 @@ pub struct SingleIndexWriter<W, S> {
|
||||
blob_writer: W,
|
||||
|
||||
/// The null bitmap to be written
|
||||
null_bitmap: BitVec,
|
||||
null_bitmap: Bitmap,
|
||||
|
||||
/// The stream of values to be written, yielded lexicographically
|
||||
values: S,
|
||||
@@ -37,30 +37,40 @@ pub struct SingleIndexWriter<W, S> {
|
||||
|
||||
/// Metadata about the index
|
||||
meta: InvertedIndexMeta,
|
||||
|
||||
/// The type of bitmap to use
|
||||
bitmap_type: BitmapType,
|
||||
|
||||
/// Buffer for writing the blob
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<W, S> SingleIndexWriter<W, S>
|
||||
where
|
||||
W: AsyncWrite + Send + Unpin,
|
||||
S: Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin,
|
||||
S: Stream<Item = Result<(Bytes, Bitmap)>> + Send + Unpin,
|
||||
{
|
||||
/// Constructs a new `SingleIndexWriter`
|
||||
pub fn new(
|
||||
name: String,
|
||||
base_offset: u64,
|
||||
null_bitmap: BitVec,
|
||||
null_bitmap: Bitmap,
|
||||
values: S,
|
||||
blob_writer: W,
|
||||
bitmap_type: BitmapType,
|
||||
) -> SingleIndexWriter<W, S> {
|
||||
SingleIndexWriter {
|
||||
blob_writer,
|
||||
null_bitmap,
|
||||
values,
|
||||
fst: MapBuilder::memory(),
|
||||
bitmap_type,
|
||||
buf: Vec::new(),
|
||||
meta: InvertedIndexMeta {
|
||||
name,
|
||||
base_offset,
|
||||
stats: Some(InvertedIndexStats::default()),
|
||||
bitmap_type: bitmap_type.into(),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
@@ -80,14 +90,17 @@ where
|
||||
|
||||
/// Writes the null bitmap to the blob and updates the metadata accordingly
|
||||
async fn write_null_bitmap(&mut self) -> Result<()> {
|
||||
let null_bitmap_bytes = self.null_bitmap.as_raw_slice();
|
||||
self.buf.clear();
|
||||
self.null_bitmap
|
||||
.serialize_into(self.bitmap_type, &mut self.buf)
|
||||
.expect("Write to vec should not fail");
|
||||
self.blob_writer
|
||||
.write_all(null_bitmap_bytes)
|
||||
.write_all(&self.buf)
|
||||
.await
|
||||
.context(WriteSnafu)?;
|
||||
|
||||
self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _;
|
||||
self.meta.null_bitmap_size = null_bitmap_bytes.len() as _;
|
||||
self.meta.null_bitmap_size = self.buf.len() as _;
|
||||
self.meta.inverted_index_size += self.meta.null_bitmap_size as u64;
|
||||
|
||||
// update stats
|
||||
@@ -100,15 +113,18 @@ where
|
||||
}
|
||||
|
||||
/// Appends a value and its bitmap to the blob, updates the FST, and the metadata
|
||||
async fn append_value(&mut self, value: Bytes, bitmap: BitVec) -> Result<()> {
|
||||
let bitmap_bytes = bitmap.into_vec();
|
||||
async fn append_value(&mut self, value: Bytes, bitmap: Bitmap) -> Result<()> {
|
||||
self.buf.clear();
|
||||
bitmap
|
||||
.serialize_into(self.bitmap_type, &mut self.buf)
|
||||
.expect("Write to vec should not fail");
|
||||
self.blob_writer
|
||||
.write_all(&bitmap_bytes)
|
||||
.write_all(&self.buf)
|
||||
.await
|
||||
.context(WriteSnafu)?;
|
||||
|
||||
let offset = self.meta.inverted_index_size as u32;
|
||||
let size = bitmap_bytes.len() as u32;
|
||||
let size = self.buf.len() as u32;
|
||||
self.meta.inverted_index_size += size as u64;
|
||||
|
||||
let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]);
|
||||
@@ -157,9 +173,10 @@ mod tests {
|
||||
let writer = SingleIndexWriter::new(
|
||||
"test".to_string(),
|
||||
0,
|
||||
BitVec::new(),
|
||||
Bitmap::new_roaring(),
|
||||
stream::empty(),
|
||||
&mut blob,
|
||||
BitmapType::Roaring,
|
||||
);
|
||||
|
||||
let meta = writer.write().await.unwrap();
|
||||
@@ -174,13 +191,23 @@ mod tests {
|
||||
let writer = SingleIndexWriter::new(
|
||||
"test".to_string(),
|
||||
0,
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
stream::iter(vec![
|
||||
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))),
|
||||
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("a"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("b"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("c"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
]),
|
||||
&mut blob,
|
||||
BitmapType::Roaring,
|
||||
);
|
||||
let meta = writer.write().await.unwrap();
|
||||
|
||||
@@ -199,13 +226,23 @@ mod tests {
|
||||
let writer = SingleIndexWriter::new(
|
||||
"test".to_string(),
|
||||
0,
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
stream::iter(vec![
|
||||
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0000_0000]))),
|
||||
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("b"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("a"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("c"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
]),
|
||||
&mut blob,
|
||||
BitmapType::Roaring,
|
||||
);
|
||||
let res = writer.write().await;
|
||||
assert!(matches!(res, Err(Error::FstInsert { .. })));
|
||||
|
||||
@@ -12,9 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::InvertedIndexMeta;
|
||||
use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
|
||||
@@ -36,7 +36,7 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
pub async fn map_values_vec(
|
||||
&mut self,
|
||||
value_and_meta_vec: &[(Vec<u64>, &'a InvertedIndexMeta)],
|
||||
) -> Result<Vec<BitVec>> {
|
||||
) -> Result<Vec<Bitmap>> {
|
||||
let groups = value_and_meta_vec
|
||||
.iter()
|
||||
.map(|(values, _)| values.len())
|
||||
@@ -50,15 +50,17 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
// bitmap offset and the lower 32 bits represent its size. This mapper uses these
|
||||
// combined offset-size pairs to fetch and union multiple bitmaps into a single `BitVec`.
|
||||
let [relative_offset, size] = bytemuck::cast::<u64, [u32; 2]>(*value);
|
||||
fetch_ranges.push(
|
||||
meta.base_offset + relative_offset as u64
|
||||
..meta.base_offset + relative_offset as u64 + size as u64,
|
||||
);
|
||||
let range = meta.base_offset + relative_offset as u64
|
||||
..meta.base_offset + relative_offset as u64 + size as u64;
|
||||
fetch_ranges.push((
|
||||
range,
|
||||
BitmapType::try_from(meta.bitmap_type).unwrap_or(BitmapType::BitVec),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if fetch_ranges.is_empty() {
|
||||
return Ok(vec![BitVec::new()]);
|
||||
return Ok(vec![Bitmap::new_bitvec()]);
|
||||
}
|
||||
|
||||
common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges);
|
||||
@@ -66,14 +68,10 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
let mut output = Vec::with_capacity(groups.len());
|
||||
|
||||
for counter in groups {
|
||||
let mut bitmap = BitVec::new();
|
||||
let mut bitmap = Bitmap::new_roaring();
|
||||
for _ in 0..counter {
|
||||
let bm = bitmaps.pop_front().unwrap();
|
||||
if bm.len() > bitmap.len() {
|
||||
bitmap = bm | bitmap
|
||||
} else {
|
||||
bitmap |= bm
|
||||
}
|
||||
bitmap.union(bm);
|
||||
}
|
||||
|
||||
output.push(bitmap);
|
||||
@@ -87,8 +85,6 @@ impl<'a> ParallelFstValuesMapper<'a> {
|
||||
mod tests {
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use common_base::bit_vec::prelude::*;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::format::reader::MockInvertedIndexReader;
|
||||
|
||||
@@ -101,19 +97,26 @@ mod tests {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader.expect_bitmap_deque().returning(|ranges| {
|
||||
let mut output = VecDeque::new();
|
||||
for range in ranges {
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size) {
|
||||
(1, 1) => output.push_back(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]),
|
||||
(2, 1) => output.push_back(bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]),
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(output)
|
||||
});
|
||||
|
||||
let meta = InvertedIndexMeta::default();
|
||||
let meta = InvertedIndexMeta {
|
||||
bitmap_type: BitmapType::Roaring.into(),
|
||||
..Default::default()
|
||||
};
|
||||
let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader);
|
||||
|
||||
let result = values_mapper
|
||||
@@ -126,33 +129,50 @@ mod tests {
|
||||
.map_values_vec(&[(vec![value(1, 1)], &meta)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]);
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1)], &meta)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]);
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]);
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]);
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 0, 1, 0, 1, 0, 1, 0, 1]);
|
||||
assert_eq!(result[1], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]);
|
||||
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b01010101], BitmapType::Roaring)
|
||||
);
|
||||
assert_eq!(
|
||||
result[1],
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
|
||||
);
|
||||
let result = values_mapper
|
||||
.map_values_vec(&[
|
||||
(vec![value(2, 1), value(1, 1)], &meta),
|
||||
@@ -160,7 +180,13 @@ mod tests {
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result[0], bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]);
|
||||
assert_eq!(result[1], bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1]);
|
||||
assert_eq!(
|
||||
result[0],
|
||||
Bitmap::from_lsb0_bytes(&[0b11111111], BitmapType::Roaring)
|
||||
);
|
||||
assert_eq!(
|
||||
result[1],
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,17 +15,17 @@
|
||||
mod predicates_apply;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
pub use predicates_apply::PredicatesIndexApplier;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
|
||||
/// The output of an apply operation.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct ApplyOutput {
|
||||
/// Bitmap of indices that match the predicates.
|
||||
pub matched_segment_ids: BitVec,
|
||||
pub matched_segment_ids: Bitmap,
|
||||
|
||||
/// The total number of rows in the index.
|
||||
pub total_row_count: usize,
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
use std::mem::size_of;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::search::fst_apply::{
|
||||
@@ -50,12 +50,11 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
) -> Result<ApplyOutput> {
|
||||
let metadata = reader.metadata().await?;
|
||||
let mut output = ApplyOutput {
|
||||
matched_segment_ids: BitVec::EMPTY,
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: metadata.total_row_count as _,
|
||||
segment_row_count: metadata.segment_row_count as _,
|
||||
};
|
||||
|
||||
let mut bitmap = Self::bitmap_full_range(&metadata);
|
||||
// TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
|
||||
let mut appliers = Vec::with_capacity(self.fst_appliers.len());
|
||||
let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
|
||||
@@ -81,7 +80,7 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
}
|
||||
|
||||
if fst_ranges.is_empty() {
|
||||
output.matched_segment_ids = bitmap;
|
||||
output.matched_segment_ids = Self::bitmap_full_range(&metadata);
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
@@ -93,14 +92,15 @@ impl IndexApplier for PredicatesIndexApplier {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut mapper = ParallelFstValuesMapper::new(reader);
|
||||
let bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
|
||||
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
|
||||
|
||||
let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
|
||||
for bm in bm_vec {
|
||||
if bitmap.count_ones() == 0 {
|
||||
if bm.count_ones() == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
bitmap &= bm;
|
||||
bitmap.intersect(bm);
|
||||
}
|
||||
|
||||
output.matched_segment_ids = bitmap;
|
||||
@@ -146,12 +146,12 @@ impl PredicatesIndexApplier {
|
||||
Ok(PredicatesIndexApplier { fst_appliers })
|
||||
}
|
||||
|
||||
/// Creates a `BitVec` representing the full range of data in the index for initial scanning.
|
||||
fn bitmap_full_range(metadata: &InvertedIndexMetas) -> BitVec {
|
||||
/// Creates a `Bitmap` representing the full range of data in the index for initial scanning.
|
||||
fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
|
||||
let total_count = metadata.total_row_count;
|
||||
let segment_count = metadata.segment_row_count;
|
||||
let len = total_count.div_ceil(segment_count);
|
||||
BitVec::repeat(true, len as _)
|
||||
Bitmap::full_bitvec(len as _)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,10 +167,10 @@ mod tests {
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::bit_vec::prelude::*;
|
||||
use greptime_proto::v1::index::InvertedIndexMeta;
|
||||
use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
|
||||
|
||||
use super::*;
|
||||
use crate::bitmap::Bitmap;
|
||||
use crate::inverted_index::error::Error;
|
||||
use crate::inverted_index::format::reader::MockInvertedIndexReader;
|
||||
use crate::inverted_index::search::fst_apply::MockFstApplier;
|
||||
@@ -190,6 +190,7 @@ mod tests {
|
||||
let meta = InvertedIndexMeta {
|
||||
name: s(tag),
|
||||
relative_fst_offset: idx,
|
||||
bitmap_type: BitmapType::Roaring.into(),
|
||||
..Default::default()
|
||||
};
|
||||
metas.metas.insert(s(tag), meta);
|
||||
@@ -229,10 +230,16 @@ mod tests {
|
||||
.unwrap()])
|
||||
});
|
||||
|
||||
mock_reader.expect_bitmap_deque().returning(|range| {
|
||||
assert_eq!(range.len(), 1);
|
||||
assert_eq!(range[0], 2..3);
|
||||
Ok(VecDeque::from([bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]]))
|
||||
mock_reader.expect_bitmap_deque().returning(|arg| {
|
||||
assert_eq!(arg.len(), 1);
|
||||
let range = &arg[0].0;
|
||||
let bitmap_type = arg[0].1;
|
||||
assert_eq!(*range, 2..3);
|
||||
assert_eq!(bitmap_type, BitmapType::Roaring);
|
||||
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
|
||||
&[0b10101010],
|
||||
bitmap_type,
|
||||
)]))
|
||||
});
|
||||
let output = applier
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
@@ -240,7 +247,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
output.matched_segment_ids,
|
||||
bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]
|
||||
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
|
||||
@@ -292,12 +299,16 @@ mod tests {
|
||||
});
|
||||
mock_reader.expect_bitmap_deque().returning(|ranges| {
|
||||
let mut output = VecDeque::new();
|
||||
for range in ranges {
|
||||
for (range, bitmap_type) in ranges {
|
||||
let offset = range.start;
|
||||
let size = range.end - range.start;
|
||||
match (offset, size) {
|
||||
(1, 1) => output.push_back(bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]),
|
||||
(2, 1) => output.push_back(bitvec![u8, Lsb0; 1, 1, 0, 1, 1, 0, 1, 1]),
|
||||
match (offset, size, bitmap_type) {
|
||||
(1, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
|
||||
}
|
||||
(2, 1, BitmapType::Roaring) => {
|
||||
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -311,7 +322,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
output.matched_segment_ids,
|
||||
bitvec![u8, Lsb0; 1, 0, 0, 0, 1, 0, 1, 0]
|
||||
Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -330,10 +341,7 @@ mod tests {
|
||||
.apply(SearchContext::default(), &mut mock_reader)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
output.matched_segment_ids,
|
||||
bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]
|
||||
); // full range to scan
|
||||
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -405,10 +413,7 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
output.matched_segment_ids,
|
||||
bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1]
|
||||
);
|
||||
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![feature(iter_partition_in_place)]
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub mod bitmap;
|
||||
pub mod bloom_filter;
|
||||
pub mod error;
|
||||
pub mod external_provider;
|
||||
|
||||
80
src/mito2/src/cache/index/inverted_index.rs
vendored
80
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -127,8 +127,8 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
mod test {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use common_base::BitVec;
|
||||
use futures::stream;
|
||||
use index::bitmap::{Bitmap, BitmapType};
|
||||
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
|
||||
use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
|
||||
use index::Bytes;
|
||||
@@ -191,24 +191,44 @@ mod test {
|
||||
writer
|
||||
.add_index(
|
||||
"tag0".to_string(),
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
Box::new(stream::iter(vec![
|
||||
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
|
||||
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("a"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("b"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("c"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
])),
|
||||
index::bitmap::BitmapType::Roaring,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.add_index(
|
||||
"tag1".to_string(),
|
||||
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
|
||||
Box::new(stream::iter(vec![
|
||||
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
|
||||
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
|
||||
Ok((
|
||||
Bytes::from("x"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("y"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
|
||||
)),
|
||||
Ok((
|
||||
Bytes::from("z"),
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
|
||||
)),
|
||||
])),
|
||||
index::bitmap::BitmapType::Roaring,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -267,22 +287,31 @@ mod test {
|
||||
assert_eq!(fst0.len(), 3);
|
||||
let [offset, size] = unpack(fst0.get(b"a").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"b").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst0.get(b"c").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag0.base_offset + offset as u64, size)
|
||||
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// tag1
|
||||
let tag1 = metadata.metas.get("tag1").unwrap();
|
||||
@@ -301,22 +330,31 @@ mod test {
|
||||
assert_eq!(fst1.len(), 3);
|
||||
let [offset, size] = unpack(fst1.get(b"x").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"y").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
|
||||
);
|
||||
let [offset, size] = unpack(fst1.get(b"z").unwrap());
|
||||
let bitmap = cached_reader
|
||||
.bitmap(tag1.base_offset + offset as u64, size)
|
||||
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
|
||||
assert_eq!(
|
||||
bitmap,
|
||||
Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
|
||||
);
|
||||
|
||||
// fuzz test
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
@@ -228,8 +228,8 @@ impl Drop for InvertedIndexApplier {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::BitVec;
|
||||
use futures::io::Cursor;
|
||||
use index::bitmap::Bitmap;
|
||||
use index::inverted_index::search::index_apply::MockIndexApplier;
|
||||
use object_store::services::Memory;
|
||||
use puffin::puffin_manager::PuffinWriter;
|
||||
@@ -259,7 +259,7 @@ mod tests {
|
||||
mock_index_applier.expect_memory_usage().returning(|| 100);
|
||||
mock_index_applier.expect_apply().returning(|_, _| {
|
||||
Ok(ApplyOutput {
|
||||
matched_segment_ids: BitVec::EMPTY,
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: 100,
|
||||
segment_row_count: 10,
|
||||
})
|
||||
@@ -276,7 +276,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
output,
|
||||
ApplyOutput {
|
||||
matched_segment_ids: BitVec::EMPTY,
|
||||
matched_segment_ids: Bitmap::new_bitvec(),
|
||||
total_row_count: 100,
|
||||
segment_row_count: 10,
|
||||
}
|
||||
|
||||
@@ -277,7 +277,9 @@ impl InvertedIndexer {
|
||||
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
|
||||
|
||||
let (index_finish, puffin_add_blob) = futures::join!(
|
||||
self.index_creator.finish(&mut index_writer),
|
||||
// TODO(zhongzc): config bitmap type
|
||||
self.index_creator
|
||||
.finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
|
||||
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user