feat(bloom-filter): add basic bloom filter creator (Part 1) (#5177)

* feat(bloom-filter): add a simple bloom filter creator (Part 1)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: header

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* docs: add format comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-12-17 14:55:42 +08:00
committed by Yingwen
parent ffdcb8c1ac
commit bcecd8ce52
6 changed files with 439 additions and 3 deletions

26
Cargo.lock generated
View File

@@ -3834,6 +3834,18 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c"
[[package]]
name = "fastbloom"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b679f25009b51b71506296f95fb6362ba7d0151172fa7373a8d1611b8bc5d10f"
dependencies = [
"getrandom",
"rand",
"siphasher 1.0.1",
"wide",
]
[[package]]
name = "fastdivide"
version = "0.4.1"
@@ -5213,6 +5225,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"fastbloom",
"fst",
"futures",
"greptime-proto",
@@ -5223,6 +5236,7 @@ dependencies = [
"regex",
"regex-automata 0.4.8",
"serde",
"serde_json",
"snafu 0.8.5",
"tantivy",
"tantivy-jieba",
@@ -8065,7 +8079,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
dependencies = [
"siphasher",
"siphasher 0.3.11",
]
[[package]]
@@ -8074,7 +8088,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher",
"siphasher 0.3.11",
]
[[package]]
@@ -10005,7 +10019,7 @@ dependencies = [
"once_cell",
"radium",
"rand",
"siphasher",
"siphasher 0.3.11",
"unic-ucd-category",
"volatile",
"widestring",
@@ -11016,6 +11030,12 @@ version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "sketches-ddsketch"
version = "0.2.2"

View File

@@ -17,6 +17,7 @@ common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
fastbloom = "0.8"
fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
@@ -26,6 +27,7 @@ prost.workspace = true
regex.workspace = true
regex-automata.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tantivy = { version = "0.22", features = ["zstd-compression"] }
tantivy-jieba = "0.11.0"

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
pub mod creator;
mod error;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];
/// The Meta information of the bloom filter stored in the file.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BloomFilterMeta {
/// The number of rows per segment.
pub rows_per_segment: usize,
/// The number of segments.
pub seg_count: usize,
/// The number of total rows.
pub row_count: usize,
/// The size of the bloom filter excluding the meta information.
pub bloom_filter_segments_size: usize,
/// Offset and size of bloom filters in the file.
pub bloom_filter_segments: Vec<BloomFilterSegmentLocation>,
}
/// The location of the bloom filter segment in the file.
#[derive(Debug, Serialize, Deserialize)]
pub struct BloomFilterSegmentLocation {
/// The offset of the bloom filter segment in the file.
pub offset: u64,
/// The size of the bloom filter segment in the file.
pub size: u64,
/// The number of elements in the bloom filter segment.
pub elem_count: usize,
}

View File

@@ -0,0 +1,294 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use fastbloom::BloomFilter;
use futures::{AsyncWrite, AsyncWriteExt};
use snafu::ResultExt;
use super::error::{IoSnafu, SerdeJsonSnafu};
use crate::bloom_filter::error::Result;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
/// The seed used for the Bloom filter.
const SEED: u128 = 42;
/// The false positive rate of the Bloom filter.
const FALSE_POSITIVE_RATE: f64 = 0.01;
/// `BloomFilterCreator` is responsible for creating and managing bloom filters
/// for a set of elements. It divides the rows into segments and creates
/// bloom filters for each segment.
///
/// # Format
///
/// The bloom filter creator writes the following format to the writer:
///
/// ```text
/// +--------------------+--------------------+-----+----------------------+----------------------+
/// | Bloom filter 0 | Bloom filter 1 | ... | BloomFilterMeta | Meta size |
/// +--------------------+--------------------+-----+----------------------+----------------------+
/// |<- bytes (size 0) ->|<- bytes (size 1) ->| ... |<- json (meta size) ->|<- u32 LE (4 bytes) ->|
/// ```
///
pub struct BloomFilterCreator {
/// The number of rows per segment set by the user.
rows_per_segment: usize,
/// Row count that added to the bloom filter so far.
accumulated_row_count: usize,
/// A set of distinct elements in the current segment.
cur_seg_distinct_elems: HashSet<Bytes>,
/// The memory usage of the current segment's distinct elements.
cur_seg_distinct_elems_mem_usage: usize,
/// Storage for finalized Bloom filters.
finalized_bloom_filters: FinalizedBloomFilterStorage,
}
impl BloomFilterCreator {
/// Creates a new `BloomFilterCreator` with the specified number of rows per segment.
///
/// # PANICS
///
/// `rows_per_segment` <= 0
pub fn new(rows_per_segment: usize) -> Self {
assert!(
rows_per_segment > 0,
"rows_per_segment must be greater than 0"
);
Self {
rows_per_segment,
accumulated_row_count: 0,
cur_seg_distinct_elems: HashSet::default(),
cur_seg_distinct_elems_mem_usage: 0,
finalized_bloom_filters: FinalizedBloomFilterStorage::default(),
}
}
/// Adds a row of elements to the bloom filter. If the number of accumulated rows
/// reaches `rows_per_segment`, it finalizes the current segment.
pub fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) {
self.accumulated_row_count += 1;
for elem in elems.into_iter() {
let len = elem.len();
let is_new = self.cur_seg_distinct_elems.insert(elem);
if is_new {
self.cur_seg_distinct_elems_mem_usage += len;
}
}
if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment();
}
}
/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
if !self.cur_seg_distinct_elems.is_empty() {
self.finalize_segment();
}
let mut meta = BloomFilterMeta {
rows_per_segment: self.rows_per_segment,
seg_count: self.finalized_bloom_filters.len(),
row_count: self.accumulated_row_count,
..Default::default()
};
let mut buf = Vec::new();
for segment in self.finalized_bloom_filters.drain() {
let slice = segment.bloom_filter.as_slice();
buf.clear();
write_u64_slice(&mut buf, slice);
writer.write_all(&buf).await.context(IoSnafu)?;
let size = buf.len();
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
offset: meta.bloom_filter_segments_size as _,
size: size as _,
elem_count: segment.element_count,
});
meta.bloom_filter_segments_size += size;
}
let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
writer.write_all(&meta_bytes).await.context(IoSnafu)?;
let meta_size = meta_bytes.len() as u32;
writer
.write_all(&meta_size.to_le_bytes())
.await
.context(IoSnafu)?;
writer.flush().await.unwrap();
Ok(())
}
/// Returns the memory usage of the creating bloom filter.
pub fn memory_usage(&self) -> usize {
self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
}
fn finalize_segment(&mut self) {
let elem_count = self.cur_seg_distinct_elems.len();
self.finalized_bloom_filters
.add(self.cur_seg_distinct_elems.drain(), elem_count);
self.cur_seg_distinct_elems_mem_usage = 0;
}
}
/// Storage for finalized Bloom filters.
///
/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage.
#[derive(Debug, Default)]
struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
}
impl FinalizedBloomFilterStorage {
fn memory_usage(&self) -> usize {
self.in_memory.iter().map(|s| s.size).sum()
}
/// Adds a new finalized Bloom filter to the storage.
///
/// TODO(zhongzc): Add support for flushing to disk.
fn add(&mut self, elems: impl IntoIterator<Item = Bytes>, elem_count: usize) {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(elem_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let cbf = FinalizedBloomFilterSegment::new(bf, elem_count);
self.in_memory.push(cbf);
}
fn len(&self) -> usize {
self.in_memory.len()
}
fn drain(&mut self) -> impl Iterator<Item = FinalizedBloomFilterSegment> + '_ {
self.in_memory.drain(..)
}
}
/// A finalized Bloom filter segment.
#[derive(Debug)]
struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter.
bloom_filter: BloomFilter,
/// The number of elements in the Bloom filter.
element_count: usize,
/// The occupied memory size of the Bloom filter.
size: usize,
}
impl FinalizedBloomFilterSegment {
fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self {
let memory_usage = std::mem::size_of_val(bloom_filter.as_slice());
Self {
bloom_filter,
element_count: elem_count,
size: memory_usage,
}
}
}
/// Writes a slice of `u64` to the buffer in little-endian order.
fn write_u64_slice(buf: &mut Vec<u8>, slice: &[u64]) {
buf.reserve(std::mem::size_of_val(slice));
for &x in slice {
buf.extend_from_slice(&x.to_le_bytes());
}
}
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use super::*;
fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect()
}
#[tokio::test]
async fn test_bloom_filter_creator() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(2);
creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]);
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);
creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]);
// Finalize the first segment
assert!(creator.cur_seg_distinct_elems_mem_usage == 0);
assert!(creator.memory_usage() > 0);
creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]);
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);
creator.finish(&mut writer).await.unwrap();
let bytes = writer.into_inner();
let total_size = bytes.len();
let meta_size_offset = total_size - 4;
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap();
assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.seg_count, 2);
assert_eq!(meta.row_count, 3);
assert_eq!(
meta.bloom_filter_segments_size + meta_bytes.len() + 4,
total_size
);
let mut bfs = Vec::new();
for segment in meta.bloom_filter_segments {
let bloom_filter_bytes =
&bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
let v = u64_vec_from_bytes(bloom_filter_bytes);
let bloom_filter = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.elem_count);
bfs.push(bloom_filter);
}
assert_eq!(bfs.len(), 2);
assert!(bfs[0].contains(&b"a"));
assert!(bfs[0].contains(&b"b"));
assert!(bfs[0].contains(&b"c"));
assert!(bfs[0].contains(&b"d"));
assert!(bfs[1].contains(&b"e"));
assert!(bfs[1].contains(&b"f"));
}
}

View File

@@ -0,0 +1,66 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("IO error"))]
Io {
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected,
External { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -15,5 +15,6 @@
#![feature(iter_partition_in_place)]
#![feature(assert_matches)]
pub mod bloom_filter;
pub mod fulltext_index;
pub mod inverted_index;