From bcecd8ce528420fe689019533c37992b1750a62e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 17 Dec 2024 14:55:42 +0800 Subject: [PATCH] feat(bloom-filter): add basic bloom filter creator (Part 1) (#5177) * feat(bloom-filter): add a simple bloom filter creator (Part 1) Signed-off-by: Zhenchi * fix: clippy Signed-off-by: Zhenchi * fix: header Signed-off-by: Zhenchi * docs: add format comment Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 26 ++- src/index/Cargo.toml | 2 + src/index/src/bloom_filter.rs | 53 +++++ src/index/src/bloom_filter/creator.rs | 294 ++++++++++++++++++++++++++ src/index/src/bloom_filter/error.rs | 66 ++++++ src/index/src/lib.rs | 1 + 6 files changed, 439 insertions(+), 3 deletions(-) create mode 100644 src/index/src/bloom_filter.rs create mode 100644 src/index/src/bloom_filter/creator.rs create mode 100644 src/index/src/bloom_filter/error.rs diff --git a/Cargo.lock b/Cargo.lock index 1fa61c8c6f..b86134a3ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3834,6 +3834,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" +[[package]] +name = "fastbloom" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b679f25009b51b71506296f95fb6362ba7d0151172fa7373a8d1611b8bc5d10f" +dependencies = [ + "getrandom", + "rand", + "siphasher 1.0.1", + "wide", +] + [[package]] name = "fastdivide" version = "0.4.1" @@ -5213,6 +5225,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "fastbloom", "fst", "futures", "greptime-proto", @@ -5223,6 +5236,7 @@ dependencies = [ "regex", "regex-automata 0.4.8", "serde", + "serde_json", "snafu 0.8.5", "tantivy", "tantivy-jieba", @@ -8065,7 +8079,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -8074,7 +8088,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -10005,7 +10019,7 @@ dependencies = [ "once_cell", "radium", "rand", - "siphasher", + "siphasher 0.3.11", "unic-ucd-category", "volatile", "widestring", @@ -11016,6 +11030,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "sketches-ddsketch" version = "0.2.2" diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 772177147a..f46c64a176 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -17,6 +17,7 @@ common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +fastbloom = "0.8" fst.workspace = true futures.workspace = true greptime-proto.workspace = true @@ -26,6 +27,7 @@ prost.workspace = true regex.workspace = true regex-automata.workspace = true serde.workspace = true +serde_json.workspace = true snafu.workspace = true tantivy = { version = "0.22", features = ["zstd-compression"] } tantivy-jieba = "0.11.0" diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs new file mode 100644 index 0000000000..e68acc698a --- /dev/null +++ b/src/index/src/bloom_filter.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +pub mod creator; +mod error; + +pub type Bytes = Vec; +pub type BytesRef<'a> = &'a [u8]; + +/// The Meta information of the bloom filter stored in the file. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct BloomFilterMeta { + /// The number of rows per segment. + pub rows_per_segment: usize, + + /// The number of segments. + pub seg_count: usize, + + /// The number of total rows. + pub row_count: usize, + + /// The size of the bloom filter excluding the meta information. + pub bloom_filter_segments_size: usize, + + /// Offset and size of bloom filters in the file. + pub bloom_filter_segments: Vec, +} + +/// The location of the bloom filter segment in the file. +#[derive(Debug, Serialize, Deserialize)] +pub struct BloomFilterSegmentLocation { + /// The offset of the bloom filter segment in the file. + pub offset: u64, + + /// The size of the bloom filter segment in the file. + pub size: u64, + + /// The number of elements in the bloom filter segment. + pub elem_count: usize, +} diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs new file mode 100644 index 0000000000..b3c95d3a76 --- /dev/null +++ b/src/index/src/bloom_filter/creator.rs @@ -0,0 +1,294 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use fastbloom::BloomFilter; +use futures::{AsyncWrite, AsyncWriteExt}; +use snafu::ResultExt; + +use super::error::{IoSnafu, SerdeJsonSnafu}; +use crate::bloom_filter::error::Result; +use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; + +/// The seed used for the Bloom filter. +const SEED: u128 = 42; + +/// The false positive rate of the Bloom filter. +const FALSE_POSITIVE_RATE: f64 = 0.01; + +/// `BloomFilterCreator` is responsible for creating and managing bloom filters +/// for a set of elements. It divides the rows into segments and creates +/// bloom filters for each segment. +/// +/// # Format +/// +/// The bloom filter creator writes the following format to the writer: +/// +/// ```text +/// +--------------------+--------------------+-----+----------------------+----------------------+ +/// | Bloom filter 0 | Bloom filter 1 | ... | BloomFilterMeta | Meta size | +/// +--------------------+--------------------+-----+----------------------+----------------------+ +/// |<- bytes (size 0) ->|<- bytes (size 1) ->| ... |<- json (meta size) ->|<- u32 LE (4 bytes) ->| +/// ``` +/// +pub struct BloomFilterCreator { + /// The number of rows per segment set by the user. + rows_per_segment: usize, + + /// Row count that added to the bloom filter so far. + accumulated_row_count: usize, + + /// A set of distinct elements in the current segment. + cur_seg_distinct_elems: HashSet, + + /// The memory usage of the current segment's distinct elements. + cur_seg_distinct_elems_mem_usage: usize, + + /// Storage for finalized Bloom filters. + finalized_bloom_filters: FinalizedBloomFilterStorage, +} + +impl BloomFilterCreator { + /// Creates a new `BloomFilterCreator` with the specified number of rows per segment. + /// + /// # PANICS + /// + /// `rows_per_segment` <= 0 + pub fn new(rows_per_segment: usize) -> Self { + assert!( + rows_per_segment > 0, + "rows_per_segment must be greater than 0" + ); + + Self { + rows_per_segment, + accumulated_row_count: 0, + cur_seg_distinct_elems: HashSet::default(), + cur_seg_distinct_elems_mem_usage: 0, + finalized_bloom_filters: FinalizedBloomFilterStorage::default(), + } + } + + /// Adds a row of elements to the bloom filter. If the number of accumulated rows + /// reaches `rows_per_segment`, it finalizes the current segment. + pub fn push_row_elems(&mut self, elems: impl IntoIterator) { + self.accumulated_row_count += 1; + for elem in elems.into_iter() { + let len = elem.len(); + let is_new = self.cur_seg_distinct_elems.insert(elem); + if is_new { + self.cur_seg_distinct_elems_mem_usage += len; + } + } + + if self.accumulated_row_count % self.rows_per_segment == 0 { + self.finalize_segment(); + } + } + + /// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer. + pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> { + if !self.cur_seg_distinct_elems.is_empty() { + self.finalize_segment(); + } + + let mut meta = BloomFilterMeta { + rows_per_segment: self.rows_per_segment, + seg_count: self.finalized_bloom_filters.len(), + row_count: self.accumulated_row_count, + ..Default::default() + }; + + let mut buf = Vec::new(); + for segment in self.finalized_bloom_filters.drain() { + let slice = segment.bloom_filter.as_slice(); + buf.clear(); + write_u64_slice(&mut buf, slice); + writer.write_all(&buf).await.context(IoSnafu)?; + + let size = buf.len(); + meta.bloom_filter_segments.push(BloomFilterSegmentLocation { + offset: meta.bloom_filter_segments_size as _, + size: size as _, + elem_count: segment.element_count, + }); + meta.bloom_filter_segments_size += size; + } + + let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?; + writer.write_all(&meta_bytes).await.context(IoSnafu)?; + + let meta_size = meta_bytes.len() as u32; + writer + .write_all(&meta_size.to_le_bytes()) + .await + .context(IoSnafu)?; + writer.flush().await.unwrap(); + + Ok(()) + } + + /// Returns the memory usage of the creating bloom filter. + pub fn memory_usage(&self) -> usize { + self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage() + } + + fn finalize_segment(&mut self) { + let elem_count = self.cur_seg_distinct_elems.len(); + self.finalized_bloom_filters + .add(self.cur_seg_distinct_elems.drain(), elem_count); + self.cur_seg_distinct_elems_mem_usage = 0; + } +} + +/// Storage for finalized Bloom filters. +/// +/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage. +#[derive(Debug, Default)] +struct FinalizedBloomFilterStorage { + /// Bloom filters that are stored in memory. + in_memory: Vec, +} + +impl FinalizedBloomFilterStorage { + fn memory_usage(&self) -> usize { + self.in_memory.iter().map(|s| s.size).sum() + } + + /// Adds a new finalized Bloom filter to the storage. + /// + /// TODO(zhongzc): Add support for flushing to disk. + fn add(&mut self, elems: impl IntoIterator, elem_count: usize) { + let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE) + .seed(&SEED) + .expected_items(elem_count); + for elem in elems.into_iter() { + bf.insert(&elem); + } + + let cbf = FinalizedBloomFilterSegment::new(bf, elem_count); + self.in_memory.push(cbf); + } + + fn len(&self) -> usize { + self.in_memory.len() + } + + fn drain(&mut self) -> impl Iterator + '_ { + self.in_memory.drain(..) + } +} + +/// A finalized Bloom filter segment. +#[derive(Debug)] +struct FinalizedBloomFilterSegment { + /// The underlying Bloom filter. + bloom_filter: BloomFilter, + + /// The number of elements in the Bloom filter. + element_count: usize, + + /// The occupied memory size of the Bloom filter. + size: usize, +} + +impl FinalizedBloomFilterSegment { + fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self { + let memory_usage = std::mem::size_of_val(bloom_filter.as_slice()); + Self { + bloom_filter, + element_count: elem_count, + size: memory_usage, + } + } +} + +/// Writes a slice of `u64` to the buffer in little-endian order. +fn write_u64_slice(buf: &mut Vec, slice: &[u64]) { + buf.reserve(std::mem::size_of_val(slice)); + for &x in slice { + buf.extend_from_slice(&x.to_le_bytes()); + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + + use super::*; + + fn u64_vec_from_bytes(bytes: &[u8]) -> Vec { + bytes + .chunks_exact(std::mem::size_of::()) + .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap())) + .collect() + } + + #[tokio::test] + async fn test_bloom_filter_creator() { + let mut writer = Cursor::new(Vec::new()); + let mut creator = BloomFilterCreator::new(2); + + creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]); + assert!(creator.cur_seg_distinct_elems_mem_usage > 0); + assert!(creator.memory_usage() > 0); + + creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]); + // Finalize the first segment + assert!(creator.cur_seg_distinct_elems_mem_usage == 0); + assert!(creator.memory_usage() > 0); + + creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]); + assert!(creator.cur_seg_distinct_elems_mem_usage > 0); + assert!(creator.memory_usage() > 0); + + creator.finish(&mut writer).await.unwrap(); + + let bytes = writer.into_inner(); + let total_size = bytes.len(); + let meta_size_offset = total_size - 4; + let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap()); + + let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4]; + let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap(); + + assert_eq!(meta.rows_per_segment, 2); + assert_eq!(meta.seg_count, 2); + assert_eq!(meta.row_count, 3); + assert_eq!( + meta.bloom_filter_segments_size + meta_bytes.len() + 4, + total_size + ); + + let mut bfs = Vec::new(); + for segment in meta.bloom_filter_segments { + let bloom_filter_bytes = + &bytes[segment.offset as usize..(segment.offset + segment.size) as usize]; + let v = u64_vec_from_bytes(bloom_filter_bytes); + let bloom_filter = BloomFilter::from_vec(v) + .seed(&SEED) + .expected_items(segment.elem_count); + bfs.push(bloom_filter); + } + + assert_eq!(bfs.len(), 2); + assert!(bfs[0].contains(&b"a")); + assert!(bfs[0].contains(&b"b")); + assert!(bfs[0].contains(&b"c")); + assert!(bfs[0].contains(&b"d")); + assert!(bfs[1].contains(&b"e")); + assert!(bfs[1].contains(&b"f")); + } +} diff --git a/src/index/src/bloom_filter/error.rs b/src/index/src/bloom_filter/error.rs new file mode 100644 index 0000000000..8e95dc5225 --- /dev/null +++ b/src/index/src/bloom_filter/error.rs @@ -0,0 +1,66 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("IO error"))] + Io { + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serde json"))] + SerdeJson { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("External error"))] + External { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected, + + External { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 5e2e411668..e52a93138f 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -15,5 +15,6 @@ #![feature(iter_partition_in_place)] #![feature(assert_matches)] +pub mod bloom_filter; pub mod fulltext_index; pub mod inverted_index;