mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat!: use indirect indices for bloom filter to reduce size (#5377)
* feat!(bloom-filter): use indirect indices to reduce size Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix format Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * update proto Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * nit Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * upgrade proto Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4449,7 +4449,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a173785b3376267c4d62b6e0b0a54ca040822aa#4a173785b3376267c4d62b6e0b0a54ca040822aa"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec801a91aa22f9666063d02805f1f60f7c93458a#ec801a91aa22f9666063d02805f1f60f7c93458a"
|
||||
dependencies = [
|
||||
"prost 0.12.6",
|
||||
"serde",
|
||||
@@ -5299,6 +5299,7 @@ dependencies = [
|
||||
"fst",
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"itertools 0.10.5",
|
||||
"mockall",
|
||||
"pin-project",
|
||||
"prost 0.12.6",
|
||||
|
||||
@@ -124,7 +124,7 @@ etcd-client = "0.13"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a173785b3376267c4d62b6e0b0a54ca040822aa" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" }
|
||||
hex = "0.4"
|
||||
http = "0.2"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -21,6 +21,7 @@ fastbloom = "0.8"
|
||||
fst.workspace = true
|
||||
futures.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
itertools.workspace = true
|
||||
mockall.workspace = true
|
||||
pin-project.workspace = true
|
||||
prost.workspace = true
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod applier;
|
||||
pub mod creator;
|
||||
pub mod error;
|
||||
@@ -24,35 +22,3 @@ pub type BytesRef<'a> = &'a [u8];
|
||||
|
||||
/// The seed used for the Bloom filter.
|
||||
pub const SEED: u128 = 42;
|
||||
|
||||
/// The Meta information of the bloom filter stored in the file.
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
|
||||
pub struct BloomFilterMeta {
|
||||
/// The number of rows per segment.
|
||||
pub rows_per_segment: usize,
|
||||
|
||||
/// The number of segments.
|
||||
pub seg_count: usize,
|
||||
|
||||
/// The number of total rows.
|
||||
pub row_count: usize,
|
||||
|
||||
/// The size of the bloom filter excluding the meta information.
|
||||
pub bloom_filter_segments_size: usize,
|
||||
|
||||
/// Offset and size of bloom filters in the file.
|
||||
pub bloom_filter_segments: Vec<BloomFilterSegmentLocation>,
|
||||
}
|
||||
|
||||
/// The location of the bloom filter segment in the file.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct BloomFilterSegmentLocation {
|
||||
/// The offset of the bloom filter segment in the file.
|
||||
pub offset: u64,
|
||||
|
||||
/// The size of the bloom filter segment in the file.
|
||||
pub size: u64,
|
||||
|
||||
/// The number of elements in the bloom filter segment.
|
||||
pub elem_count: usize,
|
||||
}
|
||||
|
||||
@@ -15,9 +15,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Range;
|
||||
|
||||
use greptime_proto::v1::index::BloomFilterMeta;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::bloom_filter::error::Result;
|
||||
use crate::bloom_filter::reader::BloomFilterReader;
|
||||
use crate::bloom_filter::{BloomFilterMeta, Bytes};
|
||||
use crate::bloom_filter::Bytes;
|
||||
|
||||
pub struct BloomFilterApplier {
|
||||
reader: Box<dyn BloomFilterReader + Send>,
|
||||
@@ -37,27 +40,42 @@ impl BloomFilterApplier {
|
||||
probes: &HashSet<Bytes>,
|
||||
search_range: Range<usize>,
|
||||
) -> Result<Vec<Range<usize>>> {
|
||||
let rows_per_segment = self.meta.rows_per_segment;
|
||||
let rows_per_segment = self.meta.rows_per_segment as usize;
|
||||
let start_seg = search_range.start / rows_per_segment;
|
||||
let end_seg = search_range.end.div_ceil(rows_per_segment);
|
||||
|
||||
let locs = &self.meta.bloom_filter_segments[start_seg..end_seg];
|
||||
let bfs = self.reader.bloom_filter_vec(locs).await?;
|
||||
let locs = &self.meta.segment_loc_indices[start_seg..end_seg];
|
||||
|
||||
let mut ranges: Vec<Range<usize>> = Vec::with_capacity(end_seg - start_seg);
|
||||
for (seg_id, bloom) in (start_seg..end_seg).zip(bfs) {
|
||||
let start = seg_id * rows_per_segment;
|
||||
// dedup locs
|
||||
let deduped_locs = locs
|
||||
.iter()
|
||||
.dedup()
|
||||
.map(|i| self.meta.bloom_filter_locs[*i as usize].clone())
|
||||
.collect::<Vec<_>>();
|
||||
let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?;
|
||||
|
||||
let mut ranges: Vec<Range<usize>> = Vec::with_capacity(bfs.len());
|
||||
for ((_, mut group), bloom) in locs
|
||||
.iter()
|
||||
.zip(start_seg..end_seg)
|
||||
.group_by(|(x, _)| **x)
|
||||
.into_iter()
|
||||
.zip(bfs.iter())
|
||||
{
|
||||
let start = group.next().unwrap().1 * rows_per_segment; // SAFETY: group is not empty
|
||||
let end = group.last().map_or(start + rows_per_segment, |(_, end)| {
|
||||
(end + 1) * rows_per_segment
|
||||
});
|
||||
let actual_start = start.max(search_range.start);
|
||||
let actual_end = end.min(search_range.end);
|
||||
for probe in probes {
|
||||
if bloom.contains(probe) {
|
||||
let end = (start + rows_per_segment).min(search_range.end);
|
||||
let start = start.max(search_range.start);
|
||||
|
||||
match ranges.last_mut() {
|
||||
Some(last) if last.end == start => {
|
||||
last.end = end;
|
||||
Some(last) if last.end == actual_start => {
|
||||
last.end = actual_end;
|
||||
}
|
||||
_ => {
|
||||
ranges.push(start..end);
|
||||
ranges.push(actual_start..actual_end);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -93,46 +111,73 @@ mod tests {
|
||||
);
|
||||
|
||||
let rows = vec![
|
||||
// seg 0
|
||||
vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
|
||||
vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
|
||||
vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
|
||||
vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
|
||||
// seg 1
|
||||
vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
|
||||
vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
|
||||
vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
|
||||
vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
|
||||
// seg 2
|
||||
vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
|
||||
vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
|
||||
vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
|
||||
vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
|
||||
// duplicate rows
|
||||
// seg 3
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
// seg 4
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
// seg 5
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
// seg 6
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
vec![b"dup".to_vec()],
|
||||
];
|
||||
|
||||
let cases = vec![
|
||||
(vec![b"row00".to_vec()], 0..12, vec![0..4]), // search one row in full range
|
||||
(vec![b"row00".to_vec()], 0..28, vec![0..4]), // search one row in full range
|
||||
(vec![b"row05".to_vec()], 4..8, vec![4..8]), // search one row in partial range
|
||||
(vec![b"row03".to_vec()], 4..8, vec![]), // search for a row that doesn't exist in the partial range
|
||||
(
|
||||
vec![b"row01".to_vec(), b"row06".to_vec()],
|
||||
0..12,
|
||||
0..28,
|
||||
vec![0..8],
|
||||
), // search multiple rows in multiple ranges
|
||||
(
|
||||
vec![b"row01".to_vec(), b"row11".to_vec()],
|
||||
0..12,
|
||||
0..28,
|
||||
vec![0..4, 8..12],
|
||||
), // search multiple rows in multiple ranges
|
||||
(vec![b"row99".to_vec()], 0..12, vec![]), // search for a row that doesn't exist in the full range
|
||||
(vec![b"row99".to_vec()], 0..28, vec![]), // search for a row that doesn't exist in the full range
|
||||
(vec![b"row00".to_vec()], 12..12, vec![]), // search in an empty range
|
||||
(
|
||||
vec![b"row04".to_vec(), b"row05".to_vec()],
|
||||
0..12,
|
||||
vec![4..8],
|
||||
), // search multiple rows in same segment
|
||||
(vec![b"seg01".to_vec()], 0..12, vec![4..8]), // search rows in a segment
|
||||
(vec![b"seg01".to_vec()], 6..12, vec![6..8]), // search rows in a segment in partial range
|
||||
(vec![b"overl".to_vec()], 0..12, vec![0..8]), // search rows in multiple segments
|
||||
(vec![b"overl".to_vec()], 2..12, vec![2..8]), // search range starts from the middle of a segment
|
||||
(vec![b"seg01".to_vec()], 0..28, vec![4..8]), // search rows in a segment
|
||||
(vec![b"seg01".to_vec()], 6..28, vec![6..8]), // search rows in a segment in partial range
|
||||
(vec![b"overl".to_vec()], 0..28, vec![0..8]), // search rows in multiple segments
|
||||
(vec![b"overl".to_vec()], 2..28, vec![2..8]), // search range starts from the middle of a segment
|
||||
(vec![b"overp".to_vec()], 0..10, vec![4..10]), // search range ends at the middle of a segment
|
||||
(vec![b"dup".to_vec()], 0..12, vec![]), // search for a duplicate row not in the range
|
||||
(vec![b"dup".to_vec()], 0..16, vec![12..16]), // search for a duplicate row in the range
|
||||
(vec![b"dup".to_vec()], 0..28, vec![12..28]), // search for a duplicate row in the full range
|
||||
];
|
||||
|
||||
for row in rows {
|
||||
|
||||
@@ -21,10 +21,12 @@ use std::sync::Arc;
|
||||
|
||||
use finalize_segment::FinalizedBloomFilterStorage;
|
||||
use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
|
||||
use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::bloom_filter::error::{IoSnafu, Result, SerdeJsonSnafu};
|
||||
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes, SEED};
|
||||
use crate::bloom_filter::error::{IoSnafu, Result};
|
||||
use crate::bloom_filter::{Bytes, SEED};
|
||||
use crate::external_provider::ExternalTempFileProvider;
|
||||
|
||||
/// The false positive rate of the Bloom filter.
|
||||
@@ -170,12 +172,15 @@ impl BloomFilterCreator {
|
||||
}
|
||||
|
||||
let mut meta = BloomFilterMeta {
|
||||
rows_per_segment: self.rows_per_segment,
|
||||
row_count: self.accumulated_row_count,
|
||||
rows_per_segment: self.rows_per_segment as _,
|
||||
row_count: self.accumulated_row_count as _,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut segs = self.finalized_bloom_filters.drain().await?;
|
||||
let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
|
||||
meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
|
||||
meta.segment_count = meta.segment_loc_indices.len() as _;
|
||||
|
||||
while let Some(segment) = segs.next().await {
|
||||
let segment = segment?;
|
||||
writer
|
||||
@@ -183,17 +188,16 @@ impl BloomFilterCreator {
|
||||
.await
|
||||
.context(IoSnafu)?;
|
||||
|
||||
let size = segment.bloom_filter_bytes.len();
|
||||
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
|
||||
offset: meta.bloom_filter_segments_size as _,
|
||||
size: size as _,
|
||||
elem_count: segment.element_count,
|
||||
let size = segment.bloom_filter_bytes.len() as u64;
|
||||
meta.bloom_filter_locs.push(BloomFilterLoc {
|
||||
offset: meta.bloom_filter_size as _,
|
||||
size,
|
||||
element_count: segment.element_count as _,
|
||||
});
|
||||
meta.bloom_filter_segments_size += size;
|
||||
meta.seg_count += 1;
|
||||
meta.bloom_filter_size += size;
|
||||
}
|
||||
|
||||
let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
|
||||
let meta_bytes = meta.encode_to_vec();
|
||||
writer.write_all(&meta_bytes).await.context(IoSnafu)?;
|
||||
|
||||
let meta_size = meta_bytes.len() as u32;
|
||||
@@ -287,34 +291,38 @@ mod tests {
|
||||
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
|
||||
|
||||
let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
|
||||
let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap();
|
||||
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.seg_count, 2);
|
||||
assert_eq!(meta.segment_count, 2);
|
||||
assert_eq!(meta.row_count, 3);
|
||||
assert_eq!(
|
||||
meta.bloom_filter_segments_size + meta_bytes.len() + 4,
|
||||
meta.bloom_filter_size as usize + meta_bytes.len() + 4,
|
||||
total_size
|
||||
);
|
||||
|
||||
let mut bfs = Vec::new();
|
||||
for segment in meta.bloom_filter_segments {
|
||||
for segment in meta.bloom_filter_locs {
|
||||
let bloom_filter_bytes =
|
||||
&bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
|
||||
let v = u64_vec_from_bytes(bloom_filter_bytes);
|
||||
let bloom_filter = BloomFilter::from_vec(v)
|
||||
.seed(&SEED)
|
||||
.expected_items(segment.elem_count);
|
||||
.expected_items(segment.element_count as usize);
|
||||
bfs.push(bloom_filter);
|
||||
}
|
||||
|
||||
assert_eq!(bfs.len(), 2);
|
||||
assert!(bfs[0].contains(&b"a"));
|
||||
assert!(bfs[0].contains(&b"b"));
|
||||
assert!(bfs[0].contains(&b"c"));
|
||||
assert!(bfs[0].contains(&b"d"));
|
||||
assert!(bfs[1].contains(&b"e"));
|
||||
assert!(bfs[1].contains(&b"f"));
|
||||
assert_eq!(meta.segment_loc_indices.len(), 2);
|
||||
|
||||
let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
|
||||
assert!(bf0.contains(&b"a"));
|
||||
assert!(bf0.contains(&b"b"));
|
||||
assert!(bf0.contains(&b"c"));
|
||||
assert!(bf0.contains(&b"d"));
|
||||
|
||||
let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
|
||||
assert!(bf1.contains(&b"e"));
|
||||
assert!(bf1.contains(&b"f"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -356,37 +364,43 @@ mod tests {
|
||||
let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
|
||||
|
||||
let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
|
||||
let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap();
|
||||
let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.seg_count, 10);
|
||||
assert_eq!(meta.segment_count, 10);
|
||||
assert_eq!(meta.row_count, 20);
|
||||
assert_eq!(
|
||||
meta.bloom_filter_segments_size + meta_bytes.len() + 4,
|
||||
meta.bloom_filter_size as usize + meta_bytes.len() + 4,
|
||||
total_size
|
||||
);
|
||||
|
||||
let mut bfs = Vec::new();
|
||||
for segment in meta.bloom_filter_segments {
|
||||
for segment in meta.bloom_filter_locs {
|
||||
let bloom_filter_bytes =
|
||||
&bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
|
||||
let v = u64_vec_from_bytes(bloom_filter_bytes);
|
||||
let bloom_filter = BloomFilter::from_vec(v)
|
||||
.seed(&SEED)
|
||||
.expected_items(segment.elem_count);
|
||||
.expected_items(segment.element_count as _);
|
||||
bfs.push(bloom_filter);
|
||||
}
|
||||
|
||||
assert_eq!(bfs.len(), 10);
|
||||
for bf in bfs.iter().take(3) {
|
||||
// 4 bloom filters to serve 10 segments
|
||||
assert_eq!(bfs.len(), 4);
|
||||
assert_eq!(meta.segment_loc_indices.len(), 10);
|
||||
|
||||
for idx in meta.segment_loc_indices.iter().take(3) {
|
||||
let bf = &bfs[*idx as usize];
|
||||
assert!(bf.contains(&b"a"));
|
||||
assert!(bf.contains(&b"b"));
|
||||
}
|
||||
for bf in bfs.iter().take(5).skip(2) {
|
||||
for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
|
||||
let bf = &bfs[*idx as usize];
|
||||
assert!(bf.contains(&b"c"));
|
||||
assert!(bf.contains(&b"d"));
|
||||
}
|
||||
for bf in bfs.iter().take(10).skip(5) {
|
||||
for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
|
||||
let bf = &bfs[*idx as usize];
|
||||
assert!(bf.contains(&b"e"));
|
||||
assert!(bf.contains(&b"f"));
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
|
||||
|
||||
/// Storage for finalized Bloom filters.
|
||||
pub struct FinalizedBloomFilterStorage {
|
||||
/// Indices of the segments in the sequence of finalized Bloom filters.
|
||||
segment_indices: Vec<usize>,
|
||||
|
||||
/// Bloom filters that are stored in memory.
|
||||
in_memory: Vec<FinalizedBloomFilterSegment>,
|
||||
|
||||
@@ -54,6 +57,9 @@ pub struct FinalizedBloomFilterStorage {
|
||||
|
||||
/// The threshold of the global memory usage of the creating Bloom filters.
|
||||
global_memory_usage_threshold: Option<usize>,
|
||||
|
||||
/// Records the number of flushed segments.
|
||||
flushed_seg_count: usize,
|
||||
}
|
||||
|
||||
impl FinalizedBloomFilterStorage {
|
||||
@@ -65,6 +71,7 @@ impl FinalizedBloomFilterStorage {
|
||||
) -> Self {
|
||||
let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
|
||||
Self {
|
||||
segment_indices: Vec::new(),
|
||||
in_memory: Vec::new(),
|
||||
intermediate_file_id_counter: 0,
|
||||
intermediate_prefix: external_prefix,
|
||||
@@ -72,6 +79,7 @@ impl FinalizedBloomFilterStorage {
|
||||
memory_usage: 0,
|
||||
global_memory_usage,
|
||||
global_memory_usage_threshold,
|
||||
flushed_seg_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,6 +105,13 @@ impl FinalizedBloomFilterStorage {
|
||||
|
||||
let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
|
||||
|
||||
// Reuse the last segment if it is the same as the current one.
|
||||
if self.in_memory.last() == Some(&fbf) {
|
||||
self.segment_indices
|
||||
.push(self.flushed_seg_count + self.in_memory.len() - 1);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Update memory usage.
|
||||
let memory_diff = fbf.bloom_filter_bytes.len();
|
||||
self.memory_usage += memory_diff;
|
||||
@@ -105,6 +120,8 @@ impl FinalizedBloomFilterStorage {
|
||||
|
||||
// Add the finalized Bloom filter to the in-memory storage.
|
||||
self.in_memory.push(fbf);
|
||||
self.segment_indices
|
||||
.push(self.flushed_seg_count + self.in_memory.len() - 1);
|
||||
|
||||
// Flush to disk if necessary.
|
||||
|
||||
@@ -129,13 +146,19 @@ impl FinalizedBloomFilterStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drains the storage and returns a stream of finalized Bloom filter segments.
|
||||
/// Drains the storage and returns indieces of the segments and a stream of finalized Bloom filters.
|
||||
pub async fn drain(
|
||||
&mut self,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>> {
|
||||
) -> Result<(
|
||||
Vec<usize>,
|
||||
Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
|
||||
)> {
|
||||
// FAST PATH: memory only
|
||||
if self.intermediate_file_id_counter == 0 {
|
||||
return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))));
|
||||
return Ok((
|
||||
std::mem::take(&mut self.segment_indices),
|
||||
Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
|
||||
));
|
||||
}
|
||||
|
||||
// SLOW PATH: memory + disk
|
||||
@@ -151,8 +174,9 @@ impl FinalizedBloomFilterStorage {
|
||||
.map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
|
||||
|
||||
let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
|
||||
Ok(Box::pin(
|
||||
stream::iter(streams).flatten().chain(in_memory_stream),
|
||||
Ok((
|
||||
std::mem::take(&mut self.segment_indices),
|
||||
Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -160,6 +184,7 @@ impl FinalizedBloomFilterStorage {
|
||||
async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
|
||||
let file_id = self.intermediate_file_id_counter;
|
||||
self.intermediate_file_id_counter += 1;
|
||||
self.flushed_seg_count += self.in_memory.len();
|
||||
|
||||
let file_id = format!("{:08}", file_id);
|
||||
let mut writer = self
|
||||
@@ -266,21 +291,25 @@ mod tests {
|
||||
|
||||
let elem_count = 2000;
|
||||
let batch = 1000;
|
||||
let dup_batch = 200;
|
||||
|
||||
for i in 0..batch {
|
||||
for i in 0..(batch - dup_batch) {
|
||||
let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
|
||||
storage.add(elems, elem_count).await.unwrap();
|
||||
}
|
||||
for _ in 0..dup_batch {
|
||||
storage.add(Some(vec![]), 1).await.unwrap();
|
||||
}
|
||||
|
||||
// Flush happens.
|
||||
assert!(storage.intermediate_file_id_counter > 0);
|
||||
|
||||
// Drain the storage.
|
||||
let mut stream = storage.drain().await.unwrap();
|
||||
let (indices, mut stream) = storage.drain().await.unwrap();
|
||||
assert_eq!(indices.len(), batch);
|
||||
|
||||
let mut i = 0;
|
||||
while let Some(segment) = stream.next().await {
|
||||
let segment = segment.unwrap();
|
||||
for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
|
||||
let segment = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(segment.element_count, elem_count);
|
||||
|
||||
let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
|
||||
@@ -292,9 +321,44 @@ mod tests {
|
||||
for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
|
||||
assert!(bf.contains(&elem));
|
||||
}
|
||||
i += 1;
|
||||
assert_eq!(indices[i], *idx);
|
||||
}
|
||||
|
||||
assert_eq!(i, batch);
|
||||
// Check the correctness of the duplicated segments.
|
||||
let dup_seg = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(dup_seg.element_count, 1);
|
||||
assert!(stream.next().await.is_none());
|
||||
assert!(indices[(batch - dup_batch)..batch]
|
||||
.iter()
|
||||
.all(|&x| x == batch - dup_batch));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_finalized_bloom_filter_storage_all_dup() {
|
||||
let mock_provider = MockExternalTempFileProvider::new();
|
||||
let global_memory_usage = Arc::new(AtomicUsize::new(0));
|
||||
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
|
||||
let provider = Arc::new(mock_provider);
|
||||
let mut storage = FinalizedBloomFilterStorage::new(
|
||||
provider,
|
||||
global_memory_usage.clone(),
|
||||
global_memory_usage_threshold,
|
||||
);
|
||||
|
||||
let batch = 1000;
|
||||
for _ in 0..batch {
|
||||
storage.add(Some(vec![]), 1).await.unwrap();
|
||||
}
|
||||
|
||||
// Drain the storage.
|
||||
let (indices, mut stream) = storage.drain().await.unwrap();
|
||||
|
||||
let bf = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(bf.element_count, 1);
|
||||
|
||||
assert!(stream.next().await.is_none());
|
||||
|
||||
assert_eq!(indices.len(), batch);
|
||||
assert!(indices.iter().all(|&x| x == 0));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,18 +31,10 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serde json"))]
|
||||
SerdeJson {
|
||||
#[snafu(display("Failed to decode protobuf"))]
|
||||
DecodeProto {
|
||||
#[snafu(source)]
|
||||
error: serde_json::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize json"))]
|
||||
DeserializeJson {
|
||||
#[snafu(source)]
|
||||
error: serde_json::Error,
|
||||
error: prost::DecodeError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -90,10 +82,9 @@ impl ErrorExt for Error {
|
||||
|
||||
match self {
|
||||
Io { .. }
|
||||
| SerdeJson { .. }
|
||||
| FileSizeTooSmall { .. }
|
||||
| UnexpectedMetaSize { .. }
|
||||
| DeserializeJson { .. }
|
||||
| DecodeProto { .. }
|
||||
| InvalidIntermediateMagic { .. } => StatusCode::Unexpected,
|
||||
|
||||
Intermediate { source, .. } => source.status_code(),
|
||||
|
||||
@@ -18,12 +18,14 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_base::range_read::RangeReader;
|
||||
use fastbloom::BloomFilter;
|
||||
use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::bloom_filter::error::{
|
||||
DeserializeJsonSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
|
||||
DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
|
||||
};
|
||||
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, SEED};
|
||||
use crate::bloom_filter::SEED;
|
||||
|
||||
/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
|
||||
const BLOOM_META_LEN_SIZE: u64 = 4;
|
||||
@@ -52,7 +54,7 @@ pub trait BloomFilterReader: Sync {
|
||||
async fn metadata(&self) -> Result<BloomFilterMeta>;
|
||||
|
||||
/// Reads a bloom filter with the given location.
|
||||
async fn bloom_filter(&self, loc: &BloomFilterSegmentLocation) -> Result<BloomFilter> {
|
||||
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
|
||||
let bytes = self.range_read(loc.offset, loc.size as _).await?;
|
||||
let vec = bytes
|
||||
.chunks_exact(std::mem::size_of::<u64>())
|
||||
@@ -60,14 +62,11 @@ pub trait BloomFilterReader: Sync {
|
||||
.collect();
|
||||
let bm = BloomFilter::from_vec(vec)
|
||||
.seed(&SEED)
|
||||
.expected_items(loc.elem_count);
|
||||
.expected_items(loc.element_count as _);
|
||||
Ok(bm)
|
||||
}
|
||||
|
||||
async fn bloom_filter_vec(
|
||||
&self,
|
||||
locs: &[BloomFilterSegmentLocation],
|
||||
) -> Result<Vec<BloomFilter>> {
|
||||
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
|
||||
let ranges = locs
|
||||
.iter()
|
||||
.map(|l| l.offset..l.offset + l.size)
|
||||
@@ -82,7 +81,7 @@ pub trait BloomFilterReader: Sync {
|
||||
.collect();
|
||||
let bm = BloomFilter::from_vec(vec)
|
||||
.seed(&SEED)
|
||||
.expected_items(loc.elem_count);
|
||||
.expected_items(loc.element_count as _);
|
||||
result.push(bm);
|
||||
}
|
||||
|
||||
@@ -173,11 +172,11 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
|
||||
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
|
||||
.await
|
||||
.context(IoSnafu)?;
|
||||
serde_json::from_slice(&meta).context(DeserializeJsonSnafu)
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
} else {
|
||||
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
|
||||
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
|
||||
serde_json::from_slice(meta).context(DeserializeJsonSnafu)
|
||||
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,17 +256,17 @@ mod tests {
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
|
||||
assert_eq!(meta.rows_per_segment, 2);
|
||||
assert_eq!(meta.seg_count, 2);
|
||||
assert_eq!(meta.segment_count, 2);
|
||||
assert_eq!(meta.row_count, 3);
|
||||
assert_eq!(meta.bloom_filter_segments.len(), 2);
|
||||
assert_eq!(meta.bloom_filter_locs.len(), 2);
|
||||
|
||||
assert_eq!(meta.bloom_filter_segments[0].offset, 0);
|
||||
assert_eq!(meta.bloom_filter_segments[0].elem_count, 4);
|
||||
assert_eq!(meta.bloom_filter_locs[0].offset, 0);
|
||||
assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
|
||||
assert_eq!(
|
||||
meta.bloom_filter_segments[1].offset,
|
||||
meta.bloom_filter_segments[0].size
|
||||
meta.bloom_filter_locs[1].offset,
|
||||
meta.bloom_filter_locs[0].size
|
||||
);
|
||||
assert_eq!(meta.bloom_filter_segments[1].elem_count, 2);
|
||||
assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,9 +277,9 @@ mod tests {
|
||||
let reader = BloomFilterReaderImpl::new(bytes);
|
||||
let meta = reader.metadata().await.unwrap();
|
||||
|
||||
assert_eq!(meta.bloom_filter_segments.len(), 2);
|
||||
assert_eq!(meta.bloom_filter_locs.len(), 2);
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_segments[0])
|
||||
.bloom_filter(&meta.bloom_filter_locs[0])
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"a"));
|
||||
@@ -289,7 +288,7 @@ mod tests {
|
||||
assert!(bf.contains(&b"d"));
|
||||
|
||||
let bf = reader
|
||||
.bloom_filter(&meta.bloom_filter_segments[1])
|
||||
.bloom_filter(&meta.bloom_filter_locs[1])
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(bf.contains(&b"e"));
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::index::BloomFilterMeta;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::future::try_join_all;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::BloomFilterReader;
|
||||
use index::bloom_filter::BloomFilterMeta;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
|
||||
|
||||
@@ -484,19 +484,15 @@ pub(crate) mod tests {
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
|
||||
assert_eq!(metadata.bloom_filter_segments.len(), 10);
|
||||
assert_eq!(metadata.segment_count, 10);
|
||||
for i in 0..5 {
|
||||
let bf = bloom_filter
|
||||
.bloom_filter(&metadata.bloom_filter_segments[i])
|
||||
.await
|
||||
.unwrap();
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
assert!(bf.contains(b"tag1"));
|
||||
}
|
||||
for i in 5..10 {
|
||||
let bf = bloom_filter
|
||||
.bloom_filter(&metadata.bloom_filter_segments[i])
|
||||
.await
|
||||
.unwrap();
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
assert!(bf.contains(b"tag2"));
|
||||
}
|
||||
}
|
||||
@@ -513,12 +509,11 @@ pub(crate) mod tests {
|
||||
let bloom_filter = BloomFilterReaderImpl::new(reader);
|
||||
let metadata = bloom_filter.metadata().await.unwrap();
|
||||
|
||||
assert_eq!(metadata.bloom_filter_segments.len(), 5);
|
||||
assert_eq!(metadata.segment_count, 5);
|
||||
for i in 0u64..20 {
|
||||
let bf = bloom_filter
|
||||
.bloom_filter(&metadata.bloom_filter_segments[i as usize / 4])
|
||||
.await
|
||||
.unwrap();
|
||||
let idx = i as usize / 4;
|
||||
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
|
||||
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
|
||||
let mut buf = vec![];
|
||||
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user