From 1acfb6ed1c21b7a42daad8c0e4191dc4656a33e5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 16 Jan 2025 21:18:29 +0800 Subject: [PATCH] feat!: use indirect indices for bloom filter to reduce size (#5377) * feat!(bloom-filter): use indirect indices to reduce size Signed-off-by: Zhenchi * fix format Signed-off-by: Zhenchi * update proto Signed-off-by: Zhenchi * nit Signed-off-by: Zhenchi * upgrade proto Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/index/Cargo.toml | 1 + src/index/src/bloom_filter.rs | 34 ------- src/index/src/bloom_filter/applier.rs | 87 +++++++++++++----- src/index/src/bloom_filter/creator.rs | 82 ++++++++++------- .../bloom_filter/creator/finalize_segment.rs | 88 ++++++++++++++++--- src/index/src/bloom_filter/error.rs | 17 +--- src/index/src/bloom_filter/reader.rs | 41 +++++---- .../src/cache/index/bloom_filter_index.rs | 2 +- .../src/sst/index/bloom_filter/creator.rs | 23 ++--- 11 files changed, 228 insertions(+), 152 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3e5f0309c..b11584039d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,7 +4449,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a173785b3376267c4d62b6e0b0a54ca040822aa#4a173785b3376267c4d62b6e0b0a54ca040822aa" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec801a91aa22f9666063d02805f1f60f7c93458a#ec801a91aa22f9666063d02805f1f60f7c93458a" dependencies = [ "prost 0.12.6", "serde", @@ -5299,6 +5299,7 @@ dependencies = [ "fst", "futures", "greptime-proto", + "itertools 0.10.5", "mockall", "pin-project", "prost 0.12.6", diff --git a/Cargo.toml b/Cargo.toml index 0e9e5f4e87..461ba7b8db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a173785b3376267c4d62b6e0b0a54ca040822aa" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" } hex = "0.4" http = "0.2" humantime = "2.1" diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index f46c64a176..4186834d46 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -21,6 +21,7 @@ fastbloom = "0.8" fst.workspace = true futures.workspace = true greptime-proto.workspace = true +itertools.workspace = true mockall.workspace = true pin-project.workspace = true prost.workspace = true diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index 43eebe592e..69f6cb94e0 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; - pub mod applier; pub mod creator; pub mod error; @@ -24,35 +22,3 @@ pub type BytesRef<'a> = &'a [u8]; /// The seed used for the Bloom filter. pub const SEED: u128 = 42; - -/// The Meta information of the bloom filter stored in the file. -#[derive(Debug, Default, Serialize, Deserialize, Clone)] -pub struct BloomFilterMeta { - /// The number of rows per segment. - pub rows_per_segment: usize, - - /// The number of segments. - pub seg_count: usize, - - /// The number of total rows. - pub row_count: usize, - - /// The size of the bloom filter excluding the meta information. - pub bloom_filter_segments_size: usize, - - /// Offset and size of bloom filters in the file. - pub bloom_filter_segments: Vec, -} - -/// The location of the bloom filter segment in the file. -#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)] -pub struct BloomFilterSegmentLocation { - /// The offset of the bloom filter segment in the file. - pub offset: u64, - - /// The size of the bloom filter segment in the file. - pub size: u64, - - /// The number of elements in the bloom filter segment. - pub elem_count: usize, -} diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 96fa28e38f..b847edb18f 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -15,9 +15,12 @@ use std::collections::HashSet; use std::ops::Range; +use greptime_proto::v1::index::BloomFilterMeta; +use itertools::Itertools; + use crate::bloom_filter::error::Result; use crate::bloom_filter::reader::BloomFilterReader; -use crate::bloom_filter::{BloomFilterMeta, Bytes}; +use crate::bloom_filter::Bytes; pub struct BloomFilterApplier { reader: Box, @@ -37,27 +40,42 @@ impl BloomFilterApplier { probes: &HashSet, search_range: Range, ) -> Result>> { - let rows_per_segment = self.meta.rows_per_segment; + let rows_per_segment = self.meta.rows_per_segment as usize; let start_seg = search_range.start / rows_per_segment; let end_seg = search_range.end.div_ceil(rows_per_segment); - let locs = &self.meta.bloom_filter_segments[start_seg..end_seg]; - let bfs = self.reader.bloom_filter_vec(locs).await?; + let locs = &self.meta.segment_loc_indices[start_seg..end_seg]; - let mut ranges: Vec> = Vec::with_capacity(end_seg - start_seg); - for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) { - let start = seg_id * rows_per_segment; + // dedup locs + let deduped_locs = locs + .iter() + .dedup() + .map(|i| self.meta.bloom_filter_locs[*i as usize].clone()) + .collect::>(); + let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?; + + let mut ranges: Vec> = Vec::with_capacity(bfs.len()); + for ((_, mut group), bloom) in locs + .iter() + .zip(start_seg..end_seg) + .group_by(|(x, _)| **x) + .into_iter() + .zip(bfs.iter()) + { + let start = group.next().unwrap().1 * rows_per_segment; // SAFETY: group is not empty + let end = group.last().map_or(start + rows_per_segment, |(_, end)| { + (end + 1) * rows_per_segment + }); + let actual_start = start.max(search_range.start); + let actual_end = end.min(search_range.end); for probe in probes { if bloom.contains(probe) { - let end = (start + rows_per_segment).min(search_range.end); - let start = start.max(search_range.start); - match ranges.last_mut() { - Some(last) if last.end == start => { - last.end = end; + Some(last) if last.end == actual_start => { + last.end = actual_end; } _ => { - ranges.push(start..end); + ranges.push(actual_start..actual_end); } } break; @@ -93,46 +111,73 @@ mod tests { ); let rows = vec![ + // seg 0 vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()], + // seg 1 vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()], vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()], vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()], vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()], + // seg 2 vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()], + // duplicate rows + // seg 3 + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + // seg 4 + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + // seg 5 + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + // seg 6 + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], + vec![b"dup".to_vec()], ]; let cases = vec![ - (vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range + (vec![b"row00".to_vec()], 0..28, vec![0..4]), // search one row in full range (vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range (vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range ( vec![b"row01".to_vec(), b"row06".to_vec()], - 0..12, + 0..28, vec![0..8], ), // search multiple rows in multiple ranges ( vec![b"row01".to_vec(), b"row11".to_vec()], - 0..12, + 0..28, vec![0..4, 8..12], ), // search multiple rows in multiple ranges - (vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range + (vec![b"row99".to_vec()], 0..28, vec![]), // search for a row that doesn't exist in the full range (vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range ( vec![b"row04".to_vec(), b"row05".to_vec()], 0..12, vec![4..8], ), // search multiple rows in same segment - (vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment - (vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range - (vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments - (vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment + (vec![b"seg01".to_vec()], 0..28, vec![4..8]), // search rows in a segment + (vec![b"seg01".to_vec()], 6..28, vec![6..8]), // search rows in a segment in partial range + (vec![b"overl".to_vec()], 0..28, vec![0..8]), // search rows in multiple segments + (vec![b"overl".to_vec()], 2..28, vec![2..8]), // search range starts from the middle of a segment (vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment + (vec![b"dup".to_vec()], 0..12, vec![]), // search for a duplicate row not in the range + (vec![b"dup".to_vec()], 0..16, vec![12..16]), // search for a duplicate row in the range + (vec![b"dup".to_vec()], 0..28, vec![12..28]), // search for a duplicate row in the full range ]; for row in rows { diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index db79983e62..b4030d28fd 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -21,10 +21,12 @@ use std::sync::Arc; use finalize_segment::FinalizedBloomFilterStorage; use futures::{AsyncWrite, AsyncWriteExt, StreamExt}; +use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta}; +use prost::Message; use snafu::ResultExt; -use crate::bloom_filter::error::{IoSnafu, Result, SerdeJsonSnafu}; -use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes, SEED}; +use crate::bloom_filter::error::{IoSnafu, Result}; +use crate::bloom_filter::{Bytes, SEED}; use crate::external_provider::ExternalTempFileProvider; /// The false positive rate of the Bloom filter. @@ -170,12 +172,15 @@ impl BloomFilterCreator { } let mut meta = BloomFilterMeta { - rows_per_segment: self.rows_per_segment, - row_count: self.accumulated_row_count, + rows_per_segment: self.rows_per_segment as _, + row_count: self.accumulated_row_count as _, ..Default::default() }; - let mut segs = self.finalized_bloom_filters.drain().await?; + let (indices, mut segs) = self.finalized_bloom_filters.drain().await?; + meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect(); + meta.segment_count = meta.segment_loc_indices.len() as _; + while let Some(segment) = segs.next().await { let segment = segment?; writer @@ -183,17 +188,16 @@ impl BloomFilterCreator { .await .context(IoSnafu)?; - let size = segment.bloom_filter_bytes.len(); - meta.bloom_filter_segments.push(BloomFilterSegmentLocation { - offset: meta.bloom_filter_segments_size as _, - size: size as _, - elem_count: segment.element_count, + let size = segment.bloom_filter_bytes.len() as u64; + meta.bloom_filter_locs.push(BloomFilterLoc { + offset: meta.bloom_filter_size as _, + size, + element_count: segment.element_count as _, }); - meta.bloom_filter_segments_size += size; - meta.seg_count += 1; + meta.bloom_filter_size += size; } - let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?; + let meta_bytes = meta.encode_to_vec(); writer.write_all(&meta_bytes).await.context(IoSnafu)?; let meta_size = meta_bytes.len() as u32; @@ -287,34 +291,38 @@ mod tests { let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap()); let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4]; - let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap(); + let meta = BloomFilterMeta::decode(meta_bytes).unwrap(); assert_eq!(meta.rows_per_segment, 2); - assert_eq!(meta.seg_count, 2); + assert_eq!(meta.segment_count, 2); assert_eq!(meta.row_count, 3); assert_eq!( - meta.bloom_filter_segments_size + meta_bytes.len() + 4, + meta.bloom_filter_size as usize + meta_bytes.len() + 4, total_size ); let mut bfs = Vec::new(); - for segment in meta.bloom_filter_segments { + for segment in meta.bloom_filter_locs { let bloom_filter_bytes = &bytes[segment.offset as usize..(segment.offset + segment.size) as usize]; let v = u64_vec_from_bytes(bloom_filter_bytes); let bloom_filter = BloomFilter::from_vec(v) .seed(&SEED) - .expected_items(segment.elem_count); + .expected_items(segment.element_count as usize); bfs.push(bloom_filter); } - assert_eq!(bfs.len(), 2); - assert!(bfs[0].contains(&b"a")); - assert!(bfs[0].contains(&b"b")); - assert!(bfs[0].contains(&b"c")); - assert!(bfs[0].contains(&b"d")); - assert!(bfs[1].contains(&b"e")); - assert!(bfs[1].contains(&b"f")); + assert_eq!(meta.segment_loc_indices.len(), 2); + + let bf0 = &bfs[meta.segment_loc_indices[0] as usize]; + assert!(bf0.contains(&b"a")); + assert!(bf0.contains(&b"b")); + assert!(bf0.contains(&b"c")); + assert!(bf0.contains(&b"d")); + + let bf1 = &bfs[meta.segment_loc_indices[1] as usize]; + assert!(bf1.contains(&b"e")); + assert!(bf1.contains(&b"f")); } #[tokio::test] @@ -356,37 +364,43 @@ mod tests { let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap()); let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4]; - let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap(); + let meta = BloomFilterMeta::decode(meta_bytes).unwrap(); assert_eq!(meta.rows_per_segment, 2); - assert_eq!(meta.seg_count, 10); + assert_eq!(meta.segment_count, 10); assert_eq!(meta.row_count, 20); assert_eq!( - meta.bloom_filter_segments_size + meta_bytes.len() + 4, + meta.bloom_filter_size as usize + meta_bytes.len() + 4, total_size ); let mut bfs = Vec::new(); - for segment in meta.bloom_filter_segments { + for segment in meta.bloom_filter_locs { let bloom_filter_bytes = &bytes[segment.offset as usize..(segment.offset + segment.size) as usize]; let v = u64_vec_from_bytes(bloom_filter_bytes); let bloom_filter = BloomFilter::from_vec(v) .seed(&SEED) - .expected_items(segment.elem_count); + .expected_items(segment.element_count as _); bfs.push(bloom_filter); } - assert_eq!(bfs.len(), 10); - for bf in bfs.iter().take(3) { + // 4 bloom filters to serve 10 segments + assert_eq!(bfs.len(), 4); + assert_eq!(meta.segment_loc_indices.len(), 10); + + for idx in meta.segment_loc_indices.iter().take(3) { + let bf = &bfs[*idx as usize]; assert!(bf.contains(&b"a")); assert!(bf.contains(&b"b")); } - for bf in bfs.iter().take(5).skip(2) { + for idx in meta.segment_loc_indices.iter().take(5).skip(2) { + let bf = &bfs[*idx as usize]; assert!(bf.contains(&b"c")); assert!(bf.contains(&b"d")); } - for bf in bfs.iter().take(10).skip(5) { + for idx in meta.segment_loc_indices.iter().take(10).skip(5) { + let bf = &bfs[*idx as usize]; assert!(bf.contains(&b"e")); assert!(bf.contains(&b"f")); } diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs index e97652f5fc..072d661f56 100644 --- a/src/index/src/bloom_filter/creator/finalize_segment.rs +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -33,6 +33,9 @@ const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB /// Storage for finalized Bloom filters. pub struct FinalizedBloomFilterStorage { + /// Indices of the segments in the sequence of finalized Bloom filters. + segment_indices: Vec, + /// Bloom filters that are stored in memory. in_memory: Vec, @@ -54,6 +57,9 @@ pub struct FinalizedBloomFilterStorage { /// The threshold of the global memory usage of the creating Bloom filters. global_memory_usage_threshold: Option, + + /// Records the number of flushed segments. + flushed_seg_count: usize, } impl FinalizedBloomFilterStorage { @@ -65,6 +71,7 @@ impl FinalizedBloomFilterStorage { ) -> Self { let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4()); Self { + segment_indices: Vec::new(), in_memory: Vec::new(), intermediate_file_id_counter: 0, intermediate_prefix: external_prefix, @@ -72,6 +79,7 @@ impl FinalizedBloomFilterStorage { memory_usage: 0, global_memory_usage, global_memory_usage_threshold, + flushed_seg_count: 0, } } @@ -97,6 +105,13 @@ impl FinalizedBloomFilterStorage { let fbf = FinalizedBloomFilterSegment::from(bf, element_count); + // Reuse the last segment if it is the same as the current one. + if self.in_memory.last() == Some(&fbf) { + self.segment_indices + .push(self.flushed_seg_count + self.in_memory.len() - 1); + return Ok(()); + } + // Update memory usage. let memory_diff = fbf.bloom_filter_bytes.len(); self.memory_usage += memory_diff; @@ -105,6 +120,8 @@ impl FinalizedBloomFilterStorage { // Add the finalized Bloom filter to the in-memory storage. self.in_memory.push(fbf); + self.segment_indices + .push(self.flushed_seg_count + self.in_memory.len() - 1); // Flush to disk if necessary. @@ -129,13 +146,19 @@ impl FinalizedBloomFilterStorage { Ok(()) } - /// Drains the storage and returns a stream of finalized Bloom filter segments. + /// Drains the storage and returns indieces of the segments and a stream of finalized Bloom filters. pub async fn drain( &mut self, - ) -> Result> + Send + '_>>> { + ) -> Result<( + Vec, + Pin> + Send + '_>>, + )> { // FAST PATH: memory only if self.intermediate_file_id_counter == 0 { - return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok)))); + return Ok(( + std::mem::take(&mut self.segment_indices), + Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))), + )); } // SLOW PATH: memory + disk @@ -151,8 +174,9 @@ impl FinalizedBloomFilterStorage { .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default())); let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok); - Ok(Box::pin( - stream::iter(streams).flatten().chain(in_memory_stream), + Ok(( + std::mem::take(&mut self.segment_indices), + Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)), )) } @@ -160,6 +184,7 @@ impl FinalizedBloomFilterStorage { async fn flush_in_memory_to_disk(&mut self) -> Result<()> { let file_id = self.intermediate_file_id_counter; self.intermediate_file_id_counter += 1; + self.flushed_seg_count += self.in_memory.len(); let file_id = format!("{:08}", file_id); let mut writer = self @@ -266,21 +291,25 @@ mod tests { let elem_count = 2000; let batch = 1000; + let dup_batch = 200; - for i in 0..batch { + for i in 0..(batch - dup_batch) { let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()); storage.add(elems, elem_count).await.unwrap(); } + for _ in 0..dup_batch { + storage.add(Some(vec![]), 1).await.unwrap(); + } // Flush happens. assert!(storage.intermediate_file_id_counter > 0); // Drain the storage. - let mut stream = storage.drain().await.unwrap(); + let (indices, mut stream) = storage.drain().await.unwrap(); + assert_eq!(indices.len(), batch); - let mut i = 0; - while let Some(segment) = stream.next().await { - let segment = segment.unwrap(); + for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) { + let segment = stream.next().await.unwrap().unwrap(); assert_eq!(segment.element_count, elem_count); let v = u64_vec_from_bytes(&segment.bloom_filter_bytes); @@ -292,9 +321,44 @@ mod tests { for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) { assert!(bf.contains(&elem)); } - i += 1; + assert_eq!(indices[i], *idx); } - assert_eq!(i, batch); + // Check the correctness of the duplicated segments. + let dup_seg = stream.next().await.unwrap().unwrap(); + assert_eq!(dup_seg.element_count, 1); + assert!(stream.next().await.is_none()); + assert!(indices[(batch - dup_batch)..batch] + .iter() + .all(|&x| x == batch - dup_batch)); + } + + #[tokio::test] + async fn test_finalized_bloom_filter_storage_all_dup() { + let mock_provider = MockExternalTempFileProvider::new(); + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + let global_memory_usage_threshold = Some(1024 * 1024); // 1MB + let provider = Arc::new(mock_provider); + let mut storage = FinalizedBloomFilterStorage::new( + provider, + global_memory_usage.clone(), + global_memory_usage_threshold, + ); + + let batch = 1000; + for _ in 0..batch { + storage.add(Some(vec![]), 1).await.unwrap(); + } + + // Drain the storage. + let (indices, mut stream) = storage.drain().await.unwrap(); + + let bf = stream.next().await.unwrap().unwrap(); + assert_eq!(bf.element_count, 1); + + assert!(stream.next().await.is_none()); + + assert_eq!(indices.len(), batch); + assert!(indices.iter().all(|&x| x == 0)); } } diff --git a/src/index/src/bloom_filter/error.rs b/src/index/src/bloom_filter/error.rs index fc89a8b68a..93b2ab1993 100644 --- a/src/index/src/bloom_filter/error.rs +++ b/src/index/src/bloom_filter/error.rs @@ -31,18 +31,10 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to serde json"))] - SerdeJson { + #[snafu(display("Failed to decode protobuf"))] + DecodeProto { #[snafu(source)] - error: serde_json::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to deserialize json"))] - DeserializeJson { - #[snafu(source)] - error: serde_json::Error, + error: prost::DecodeError, #[snafu(implicit)] location: Location, }, @@ -90,10 +82,9 @@ impl ErrorExt for Error { match self { Io { .. } - | SerdeJson { .. } | FileSizeTooSmall { .. } | UnexpectedMetaSize { .. } - | DeserializeJson { .. } + | DecodeProto { .. } | InvalidIntermediateMagic { .. } => StatusCode::Unexpected, Intermediate { source, .. } => source.status_code(), diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 2a52f9d071..037eb6b3db 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -18,12 +18,14 @@ use async_trait::async_trait; use bytes::Bytes; use common_base::range_read::RangeReader; use fastbloom::BloomFilter; +use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta}; +use prost::Message; use snafu::{ensure, ResultExt}; use crate::bloom_filter::error::{ - DeserializeJsonSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu, + DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu, }; -use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, SEED}; +use crate::bloom_filter::SEED; /// Minimum size of the bloom filter, which is the size of the length of the bloom filter. const BLOOM_META_LEN_SIZE: u64 = 4; @@ -52,7 +54,7 @@ pub trait BloomFilterReader: Sync { async fn metadata(&self) -> Result; /// Reads a bloom filter with the given location. - async fn bloom_filter(&self, loc: &BloomFilterSegmentLocation) -> Result { + async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result { let bytes = self.range_read(loc.offset, loc.size as _).await?; let vec = bytes .chunks_exact(std::mem::size_of::()) @@ -60,14 +62,11 @@ pub trait BloomFilterReader: Sync { .collect(); let bm = BloomFilter::from_vec(vec) .seed(&SEED) - .expected_items(loc.elem_count); + .expected_items(loc.element_count as _); Ok(bm) } - async fn bloom_filter_vec( - &self, - locs: &[BloomFilterSegmentLocation], - ) -> Result> { + async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result> { let ranges = locs .iter() .map(|l| l.offset..l.offset + l.size) @@ -82,7 +81,7 @@ pub trait BloomFilterReader: Sync { .collect(); let bm = BloomFilter::from_vec(vec) .seed(&SEED) - .expected_items(loc.elem_count); + .expected_items(loc.element_count as _); result.push(bm); } @@ -173,11 +172,11 @@ impl BloomFilterMetaReader { .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE) .await .context(IoSnafu)?; - serde_json::from_slice(&meta).context(DeserializeJsonSnafu) + BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) } else { let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start; let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize]; - serde_json::from_slice(meta).context(DeserializeJsonSnafu) + BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) } } @@ -257,17 +256,17 @@ mod tests { let meta = reader.metadata().await.unwrap(); assert_eq!(meta.rows_per_segment, 2); - assert_eq!(meta.seg_count, 2); + assert_eq!(meta.segment_count, 2); assert_eq!(meta.row_count, 3); - assert_eq!(meta.bloom_filter_segments.len(), 2); + assert_eq!(meta.bloom_filter_locs.len(), 2); - assert_eq!(meta.bloom_filter_segments[0].offset, 0); - assert_eq!(meta.bloom_filter_segments[0].elem_count, 4); + assert_eq!(meta.bloom_filter_locs[0].offset, 0); + assert_eq!(meta.bloom_filter_locs[0].element_count, 4); assert_eq!( - meta.bloom_filter_segments[1].offset, - meta.bloom_filter_segments[0].size + meta.bloom_filter_locs[1].offset, + meta.bloom_filter_locs[0].size ); - assert_eq!(meta.bloom_filter_segments[1].elem_count, 2); + assert_eq!(meta.bloom_filter_locs[1].element_count, 2); } } @@ -278,9 +277,9 @@ mod tests { let reader = BloomFilterReaderImpl::new(bytes); let meta = reader.metadata().await.unwrap(); - assert_eq!(meta.bloom_filter_segments.len(), 2); + assert_eq!(meta.bloom_filter_locs.len(), 2); let bf = reader - .bloom_filter(&meta.bloom_filter_segments[0]) + .bloom_filter(&meta.bloom_filter_locs[0]) .await .unwrap(); assert!(bf.contains(&b"a")); @@ -289,7 +288,7 @@ mod tests { assert!(bf.contains(&b"d")); let bf = reader - .bloom_filter(&meta.bloom_filter_segments[1]) + .bloom_filter(&meta.bloom_filter_locs[1]) .await .unwrap(); assert!(bf.contains(&b"e")); diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 1d600336a4..ad632e47ab 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -15,12 +15,12 @@ use std::ops::Range; use std::sync::Arc; +use api::v1::index::BloomFilterMeta; use async_trait::async_trait; use bytes::Bytes; use futures::future::try_join_all; use index::bloom_filter::error::Result; use index::bloom_filter::reader::BloomFilterReader; -use index::bloom_filter::BloomFilterMeta; use store_api::storage::ColumnId; use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 99ff2c3547..6676375cb6 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -484,19 +484,15 @@ pub(crate) mod tests { let bloom_filter = BloomFilterReaderImpl::new(reader); let metadata = bloom_filter.metadata().await.unwrap(); - assert_eq!(metadata.bloom_filter_segments.len(), 10); + assert_eq!(metadata.segment_count, 10); for i in 0..5 { - let bf = bloom_filter - .bloom_filter(&metadata.bloom_filter_segments[i]) - .await - .unwrap(); + let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; + let bf = bloom_filter.bloom_filter(loc).await.unwrap(); assert!(bf.contains(b"tag1")); } for i in 5..10 { - let bf = bloom_filter - .bloom_filter(&metadata.bloom_filter_segments[i]) - .await - .unwrap(); + let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; + let bf = bloom_filter.bloom_filter(loc).await.unwrap(); assert!(bf.contains(b"tag2")); } } @@ -513,12 +509,11 @@ pub(crate) mod tests { let bloom_filter = BloomFilterReaderImpl::new(reader); let metadata = bloom_filter.metadata().await.unwrap(); - assert_eq!(metadata.bloom_filter_segments.len(), 5); + assert_eq!(metadata.segment_count, 5); for i in 0u64..20 { - let bf = bloom_filter - .bloom_filter(&metadata.bloom_filter_segments[i as usize / 4]) - .await - .unwrap(); + let idx = i as usize / 4; + let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize]; + let bf = bloom_filter.bloom_filter(loc).await.unwrap(); let mut buf = vec![]; IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) .unwrap();