feat(bloom-filter): add memory control for creator (#5185)

* feat(bloom-filter): add memory control for creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: remove meaningless buf

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add codec for intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-12-20 14:59:44 +08:00
committed by Yingwen
parent 556bd796d8
commit ee72ae8bd0
14 changed files with 769 additions and 148 deletions

1
Cargo.lock generated
View File

@@ -5270,6 +5270,7 @@ dependencies = [
name = "index"
version = "0.11.1"
dependencies = [
"async-stream",
"async-trait",
"asynchronous-codec",
"bytemuck",

View File

@@ -8,6 +8,7 @@ license.workspace = true
workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
asynchronous-codec = "0.7.0"
bytemuck.workspace = true

View File

@@ -12,21 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
mod finalize_segment;
mod intermediate_codec;
use fastbloom::BloomFilter;
use futures::{AsyncWrite, AsyncWriteExt};
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use finalize_segment::FinalizedBloomFilterStorage;
use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
use snafu::ResultExt;
use super::error::{IoSnafu, SerdeJsonSnafu};
use crate::bloom_filter::error::Result;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
use crate::external_provider::ExternalTempFileProvider;
/// The seed used for the Bloom filter.
const SEED: u128 = 42;
pub const SEED: u128 = 42;
/// The false positive rate of the Bloom filter.
const FALSE_POSITIVE_RATE: f64 = 0.01;
pub const FALSE_POSITIVE_RATE: f64 = 0.01;
/// `BloomFilterCreator` is responsible for creating and managing bloom filters
/// for a set of elements. It divides the rows into segments and creates
@@ -58,6 +64,9 @@ pub struct BloomFilterCreator {
/// Storage for finalized Bloom filters.
finalized_bloom_filters: FinalizedBloomFilterStorage,
/// Global memory usage of the bloom filter creator.
global_memory_usage: Arc<AtomicUsize>,
}
impl BloomFilterCreator {
@@ -66,7 +75,12 @@ impl BloomFilterCreator {
/// # PANICS
///
/// `rows_per_segment` <= 0
pub fn new(rows_per_segment: usize) -> Self {
pub fn new(
rows_per_segment: usize,
intermediate_provider: Box<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
assert!(
rows_per_segment > 0,
"rows_per_segment must be greater than 0"
@@ -77,54 +91,67 @@ impl BloomFilterCreator {
accumulated_row_count: 0,
cur_seg_distinct_elems: HashSet::default(),
cur_seg_distinct_elems_mem_usage: 0,
finalized_bloom_filters: FinalizedBloomFilterStorage::default(),
global_memory_usage: global_memory_usage.clone(),
finalized_bloom_filters: FinalizedBloomFilterStorage::new(
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
),
}
}
/// Adds a row of elements to the bloom filter. If the number of accumulated rows
/// reaches `rows_per_segment`, it finalizes the current segment.
pub fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) {
pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
self.accumulated_row_count += 1;
let mut mem_diff = 0;
for elem in elems.into_iter() {
let len = elem.len();
let is_new = self.cur_seg_distinct_elems.insert(elem);
if is_new {
self.cur_seg_distinct_elems_mem_usage += len;
mem_diff += len;
}
}
self.cur_seg_distinct_elems_mem_usage += mem_diff;
self.global_memory_usage
.fetch_add(mem_diff, Ordering::Relaxed);
if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment();
self.finalize_segment().await?;
}
Ok(())
}
/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
if !self.cur_seg_distinct_elems.is_empty() {
self.finalize_segment();
self.finalize_segment().await?;
}
let mut meta = BloomFilterMeta {
rows_per_segment: self.rows_per_segment,
seg_count: self.finalized_bloom_filters.len(),
row_count: self.accumulated_row_count,
..Default::default()
};
let mut buf = Vec::new();
for segment in self.finalized_bloom_filters.drain() {
let slice = segment.bloom_filter.as_slice();
buf.clear();
write_u64_slice(&mut buf, slice);
writer.write_all(&buf).await.context(IoSnafu)?;
let mut segs = self.finalized_bloom_filters.drain().await?;
while let Some(segment) = segs.next().await {
let segment = segment?;
writer
.write_all(&segment.bloom_filter_bytes)
.await
.context(IoSnafu)?;
let size = buf.len();
let size = segment.bloom_filter_bytes.len();
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
offset: meta.bloom_filter_segments_size as _,
size: size as _,
elem_count: segment.element_count,
});
meta.bloom_filter_segments_size += size;
meta.seg_count += 1;
}
let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
@@ -145,91 +172,29 @@ impl BloomFilterCreator {
self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
}
fn finalize_segment(&mut self) {
async fn finalize_segment(&mut self) -> Result<()> {
let elem_count = self.cur_seg_distinct_elems.len();
self.finalized_bloom_filters
.add(self.cur_seg_distinct_elems.drain(), elem_count);
.add(self.cur_seg_distinct_elems.drain(), elem_count)
.await?;
self.global_memory_usage
.fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
self.cur_seg_distinct_elems_mem_usage = 0;
}
}
/// Storage for finalized Bloom filters.
///
/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage.
#[derive(Debug, Default)]
struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
}
impl FinalizedBloomFilterStorage {
fn memory_usage(&self) -> usize {
self.in_memory.iter().map(|s| s.size).sum()
}
/// Adds a new finalized Bloom filter to the storage.
///
/// TODO(zhongzc): Add support for flushing to disk.
fn add(&mut self, elems: impl IntoIterator<Item = Bytes>, elem_count: usize) {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(elem_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let cbf = FinalizedBloomFilterSegment::new(bf, elem_count);
self.in_memory.push(cbf);
}
fn len(&self) -> usize {
self.in_memory.len()
}
fn drain(&mut self) -> impl Iterator<Item = FinalizedBloomFilterSegment> + '_ {
self.in_memory.drain(..)
}
}
/// A finalized Bloom filter segment.
#[derive(Debug)]
struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter.
bloom_filter: BloomFilter,
/// The number of elements in the Bloom filter.
element_count: usize,
/// The occupied memory size of the Bloom filter.
size: usize,
}
impl FinalizedBloomFilterSegment {
fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self {
let memory_usage = std::mem::size_of_val(bloom_filter.as_slice());
Self {
bloom_filter,
element_count: elem_count,
size: memory_usage,
}
}
}
/// Writes a slice of `u64` to the buffer in little-endian order.
fn write_u64_slice(buf: &mut Vec<u8>, slice: &[u64]) {
buf.reserve(std::mem::size_of_val(slice));
for &x in slice {
buf.extend_from_slice(&x.to_le_bytes());
Ok(())
}
}
#[cfg(test)]
mod tests {
use fastbloom::BloomFilter;
use futures::io::Cursor;
use super::*;
use crate::external_provider::MockExternalTempFileProvider;
fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
/// Converts a slice of bytes to a vector of `u64`.
pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
@@ -239,18 +204,32 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(2);
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]);
creator
.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);
creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]);
creator
.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
.await
.unwrap();
// Finalize the first segment
assert!(creator.cur_seg_distinct_elems_mem_usage == 0);
assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
assert!(creator.memory_usage() > 0);
creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]);
creator
.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
.await
.unwrap();
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);

View File

@@ -0,0 +1,293 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use asynchronous_codec::{FramedRead, FramedWrite};
use fastbloom::BloomFilter;
use futures::stream::StreamExt;
use futures::{stream, AsyncWriteExt, Stream};
use snafu::ResultExt;
use super::intermediate_codec::IntermediateBloomFilterCodecV1;
use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
use crate::bloom_filter::Bytes;
use crate::external_provider::ExternalTempFileProvider;
/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
/// Storage for finalized Bloom filters.
pub struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
/// Used to generate unique file IDs for intermediate Bloom filters.
intermediate_file_id_counter: usize,
/// Prefix for intermediate Bloom filter files.
intermediate_prefix: String,
/// The provider for intermediate Bloom filter files.
intermediate_provider: Box<dyn ExternalTempFileProvider>,
/// The memory usage of the in-memory Bloom filters.
memory_usage: usize,
/// The global memory usage provided by the user to track the
/// total memory usage of the creating Bloom filters.
global_memory_usage: Arc<AtomicUsize>,
/// The threshold of the global memory usage of the creating Bloom filters.
global_memory_usage_threshold: Option<usize>,
}
impl FinalizedBloomFilterStorage {
/// Creates a new `FinalizedBloomFilterStorage`.
pub fn new(
intermediate_provider: Box<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
Self {
in_memory: Vec::new(),
intermediate_file_id_counter: 0,
intermediate_prefix: external_prefix,
intermediate_provider,
memory_usage: 0,
global_memory_usage,
global_memory_usage_threshold,
}
}
/// Returns the memory usage of the storage.
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
/// Adds a new finalized Bloom filter to the storage.
///
/// If the memory usage exceeds the threshold, flushes the in-memory Bloom filters to disk.
pub async fn add(
&mut self,
elems: impl IntoIterator<Item = Bytes>,
element_count: usize,
) -> Result<()> {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(element_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
// Update memory usage.
let memory_diff = fbf.bloom_filter_bytes.len();
self.memory_usage += memory_diff;
self.global_memory_usage
.fetch_add(memory_diff, Ordering::Relaxed);
// Add the finalized Bloom filter to the in-memory storage.
self.in_memory.push(fbf);
// Flush to disk if necessary.
// Do not flush if memory usage is too low.
if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
return Ok(());
}
// Check if the global memory usage exceeds the threshold and flush to disk if necessary.
if let Some(threshold) = self.global_memory_usage_threshold {
let global = self.global_memory_usage.load(Ordering::Relaxed);
if global > threshold {
self.flush_in_memory_to_disk().await?;
self.global_memory_usage
.fetch_sub(self.memory_usage, Ordering::Relaxed);
self.memory_usage = 0;
}
}
Ok(())
}
/// Drains the storage and returns a stream of finalized Bloom filter segments.
pub async fn drain(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + '_>>> {
// FAST PATH: memory only
if self.intermediate_file_id_counter == 0 {
return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))));
}
// SLOW PATH: memory + disk
let mut on_disk = self
.intermediate_provider
.read_all(&self.intermediate_prefix)
.await
.context(IntermediateSnafu)?;
on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
let streams = on_disk
.into_iter()
.map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
Ok(Box::pin(
stream::iter(streams).flatten().chain(in_memory_stream),
))
}
/// Flushes the in-memory Bloom filters to disk.
async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
let file_id = self.intermediate_file_id_counter;
self.intermediate_file_id_counter += 1;
let file_id = format!("{:08}", file_id);
let mut writer = self
.intermediate_provider
.create(&self.intermediate_prefix, &file_id)
.await
.context(IntermediateSnafu)?;
let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
// `forward()` will flush and close the writer when the stream ends
if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
.forward(fw)
.await
{
writer.close().await.context(IoSnafu)?;
writer.flush().await.context(IoSnafu)?;
return Err(e);
}
Ok(())
}
}
/// A finalized Bloom filter segment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter bytes.
pub bloom_filter_bytes: Vec<u8>,
/// The number of elements in the Bloom filter.
pub element_count: usize,
}
impl FinalizedBloomFilterSegment {
fn from(bf: BloomFilter, elem_count: usize) -> Self {
let bf_slice = bf.as_slice();
let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
for &x in bf_slice {
bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
}
Self {
bloom_filter_bytes,
element_count: elem_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;
use futures::AsyncRead;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use super::*;
use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
use crate::external_provider::MockExternalTempFileProvider;
#[tokio::test]
async fn test_finalized_bloom_filter_storage() {
let mut mock_provider = MockExternalTempFileProvider::new();
let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
Arc::new(Mutex::new(HashMap::new()));
mock_provider.expect_create().returning({
let files = Arc::clone(&mock_files);
move |file_group, file_id| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
let (writer, reader) = duplex(2 * 1024 * 1024);
files.insert(file_id.to_string(), Box::new(reader.compat()));
Ok(Box::new(writer.compat_write()))
}
});
mock_provider.expect_read_all().returning({
let files = Arc::clone(&mock_files);
move |file_group| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
Ok(files.drain().collect::<Vec<_>>())
}
});
let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Box::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,
);
let elem_count = 2000;
let batch = 1000;
for i in 0..batch {
let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
storage.add(elems, elem_count).await.unwrap();
}
// Flush happens.
assert!(storage.intermediate_file_id_counter > 0);
// Drain the storage.
let mut stream = storage.drain().await.unwrap();
let mut i = 0;
while let Some(segment) = stream.next().await {
let segment = segment.unwrap();
assert_eq!(segment.element_count, elem_count);
let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
// Check the correctness of the Bloom filter.
let bf = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.element_count);
for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
assert!(bf.contains(&elem));
}
i += 1;
}
assert_eq!(i, batch);
}
}

View File

@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use asynchronous_codec::{BytesMut, Decoder, Encoder};
use bytes::{Buf, BufMut};
use snafu::{ensure, ResultExt};
use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
use crate::bloom_filter::error::{Error, InvalidIntermediateMagicSnafu, IoSnafu, Result};
/// The magic number for the codec version 1 of the intermediate bloom filter.
const CODEC_V1_MAGIC: &[u8; 4] = b"bi01";
/// Codec of the intermediate finalized bloom filter segment.
///
/// # Format
///
/// [ magic ][ elem count ][ size ][ bloom filter ][ elem count ][ size ][ bloom filter ]...
/// [4] [8] [8] [size] [8] [8] [size]
#[derive(Debug, Default)]
pub struct IntermediateBloomFilterCodecV1 {
handled_header_magic: bool,
}
impl Encoder for IntermediateBloomFilterCodecV1 {
type Item<'a> = FinalizedBloomFilterSegment;
type Error = Error;
fn encode(&mut self, item: FinalizedBloomFilterSegment, dst: &mut BytesMut) -> Result<()> {
if !self.handled_header_magic {
dst.extend_from_slice(CODEC_V1_MAGIC);
self.handled_header_magic = true;
}
let segment_bytes = item.bloom_filter_bytes;
let elem_count = item.element_count;
dst.reserve(2 * std::mem::size_of::<u64>() + segment_bytes.len());
dst.put_u64_le(elem_count as u64);
dst.put_u64_le(segment_bytes.len() as u64);
dst.extend_from_slice(&segment_bytes);
Ok(())
}
}
impl Decoder for IntermediateBloomFilterCodecV1 {
type Item = FinalizedBloomFilterSegment;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if !self.handled_header_magic {
let m_len = CODEC_V1_MAGIC.len();
if src.remaining() < m_len {
return Ok(None);
}
let magic_bytes = &src[..m_len];
ensure!(
magic_bytes == CODEC_V1_MAGIC,
InvalidIntermediateMagicSnafu {
invalid: magic_bytes,
}
);
self.handled_header_magic = true;
src.advance(m_len);
}
let s = &src[..];
let u64_size = std::mem::size_of::<u64>();
let n_size = u64_size * 2;
if s.len() < n_size {
return Ok(None);
}
let element_count = u64::from_le_bytes(s[0..u64_size].try_into().unwrap()) as usize;
let segment_size = u64::from_le_bytes(s[u64_size..n_size].try_into().unwrap()) as usize;
if s.len() < n_size + segment_size {
return Ok(None);
}
let bloom_filter_bytes = s[n_size..n_size + segment_size].to_vec();
src.advance(n_size + segment_size);
Ok(Some(FinalizedBloomFilterSegment {
element_count,
bloom_filter_bytes,
}))
}
}
/// Required for [`Encoder`] and [`Decoder`] implementations.
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Err::<(), std::io::Error>(error)
.context(IoSnafu)
.unwrap_err()
}
}
#[cfg(test)]
mod tests {
use asynchronous_codec::{FramedRead, FramedWrite};
use futures::io::Cursor;
use futures::{SinkExt, StreamExt};
use super::*;
use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
#[test]
fn test_intermediate_bloom_filter_codec_v1_basic() {
let mut encoder = IntermediateBloomFilterCodecV1::default();
let mut buf = BytesMut::new();
let item1 = FinalizedBloomFilterSegment {
element_count: 2,
bloom_filter_bytes: vec![1, 2, 3, 4],
};
let item2 = FinalizedBloomFilterSegment {
element_count: 3,
bloom_filter_bytes: vec![5, 6, 7, 8],
};
let item3 = FinalizedBloomFilterSegment {
element_count: 4,
bloom_filter_bytes: vec![9, 10, 11, 12],
};
encoder.encode(item1.clone(), &mut buf).unwrap();
encoder.encode(item2.clone(), &mut buf).unwrap();
encoder.encode(item3.clone(), &mut buf).unwrap();
let mut buf = buf.freeze().try_into_mut().unwrap();
let mut decoder = IntermediateBloomFilterCodecV1::default();
let decoded_item1 = decoder.decode(&mut buf).unwrap().unwrap();
let decoded_item2 = decoder.decode(&mut buf).unwrap().unwrap();
let decoded_item3 = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(item1, decoded_item1);
assert_eq!(item2, decoded_item2);
assert_eq!(item3, decoded_item3);
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write() {
let item1 = FinalizedBloomFilterSegment {
element_count: 2,
bloom_filter_bytes: vec![1, 2, 3, 4],
};
let item2 = FinalizedBloomFilterSegment {
element_count: 3,
bloom_filter_bytes: vec![5, 6, 7, 8],
};
let item3 = FinalizedBloomFilterSegment {
element_count: 4,
bloom_filter_bytes: vec![9, 10, 11, 12],
};
let mut bytes = Cursor::new(vec![]);
let mut writer = FramedWrite::new(&mut bytes, IntermediateBloomFilterCodecV1::default());
writer.send(item1.clone()).await.unwrap();
writer.send(item2.clone()).await.unwrap();
writer.send(item3.clone()).await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let bytes = bytes.into_inner();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let decoded_item1 = reader.next().await.unwrap().unwrap();
let decoded_item2 = reader.next().await.unwrap().unwrap();
let decoded_item3 = reader.next().await.unwrap().unwrap();
assert!(reader.next().await.is_none());
assert_eq!(item1, decoded_item1);
assert_eq!(item2, decoded_item2);
assert_eq!(item3, decoded_item3);
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_only_magic() {
let bytes = CODEC_V1_MAGIC.to_vec();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
assert!(reader.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_magic() {
let bytes = CODEC_V1_MAGIC[..3].to_vec();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_item() {
let mut bytes = vec![];
bytes.extend_from_slice(CODEC_V1_MAGIC);
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_magic() {
let mut bytes = vec![];
bytes.extend_from_slice(b"bi02");
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
bytes.extend_from_slice(&[1, 2, 3, 4]);
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_length() {
let mut bytes = vec![];
bytes.extend_from_slice(CODEC_V1_MAGIC);
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
bytes.extend_from_slice(&[1, 2, 3]);
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
}

View File

@@ -39,6 +39,20 @@ pub enum Error {
location: Location,
},
#[snafu(display("Intermediate error"))]
Intermediate {
source: crate::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid intermediate magic"))]
InvalidIntermediateMagic {
invalid: Vec<u8>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -52,8 +66,11 @@ impl ErrorExt for Error {
use Error::*;
match self {
Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected,
Io { .. } | SerdeJson { .. } | InvalidIntermediateMagic { .. } => {
StatusCode::Unexpected
}
Intermediate { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
}
}

48
src/index/src/error.rs Normal file
View File

@@ -0,0 +1,48 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error"))]
External {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
External { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -15,25 +15,24 @@
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite};
use crate::inverted_index::error::Result;
use crate::error::Error;
/// Trait for managing intermediate files during external sorting for a particular index.
pub type Writer = Box<dyn AsyncWrite + Unpin + Send>;
pub type Reader = Box<dyn AsyncRead + Unpin + Send>;
/// Trait for managing intermediate files to control memory usage for a particular index.
#[mockall::automock]
#[async_trait]
pub trait ExternalTempFileProvider: Send + Sync {
/// Creates and opens a new intermediate file associated with a specific index for writing.
/// Creates and opens a new intermediate file associated with a specific `file_group` for writing.
/// The implementation should ensure that the file does not already exist.
///
/// - `index_name`: the name of the index for which the file will be associated
/// - `file_group`: a unique identifier for the group of files
/// - `file_id`: a unique identifier for the new file
async fn create(
&self,
index_name: &str,
file_id: &str,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>>;
async fn create(&self, file_group: &str, file_id: &str) -> Result<Writer, Error>;
/// Retrieves all intermediate files associated with a specific index for an external sorting operation.
/// Retrieves all intermediate files and their associated file identifiers for a specific `file_group`.
///
/// `index_name`: the name of the index to retrieve intermediate files for
async fn read_all(&self, index_name: &str) -> Result<Vec<Box<dyn AsyncRead + Unpin + Send>>>;
/// `file_group` is a unique identifier for the group of files.
async fn read_all(&self, file_group: &str) -> Result<Vec<(String, Reader)>, Error>;
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod external_provider;
pub mod external_sort;
mod intermediate_rw;
mod merge_stream;

View File

@@ -23,15 +23,16 @@ use async_trait::async_trait;
use common_base::BitVec;
use common_telemetry::{debug, error};
use futures::stream;
use snafu::ResultExt;
use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use crate::external_provider::ExternalTempFileProvider;
use crate::inverted_index::create::sort::intermediate_rw::{
IntermediateReader, IntermediateWriter,
};
use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
use crate::inverted_index::create::sort_create::SorterFactory;
use crate::inverted_index::error::Result;
use crate::inverted_index::error::{IntermediateSnafu, Result};
use crate::inverted_index::{Bytes, BytesRef};
/// `ExternalSorter` manages the sorting of data using both in-memory structures and external files.
@@ -107,7 +108,11 @@ impl Sorter for ExternalSorter {
/// Finalizes the sorting operation, merging data from both in-memory buffer and external files
/// into a sorted stream
async fn output(&mut self) -> Result<SortOutput> {
let readers = self.temp_file_provider.read_all(&self.index_name).await?;
let readers = self
.temp_file_provider
.read_all(&self.index_name)
.await
.context(IntermediateSnafu)?;
// TODO(zhongzc): k-way merge instead of 2-way merge
@@ -122,7 +127,7 @@ impl Sorter for ExternalSorter {
Ok((value, bitmap))
}),
)));
for reader in readers {
for (_, reader) in readers {
tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
}
@@ -241,7 +246,11 @@ impl ExternalSorter {
let file_id = &format!("{:012}", self.total_row_count);
let index_name = &self.index_name;
let writer = self.temp_file_provider.create(index_name, file_id).await?;
let writer = self
.temp_file_provider
.create(index_name, file_id)
.await
.context(IntermediateSnafu)?;
let values = mem::take(&mut self.values_buffer);
self.global_memory_usage
@@ -302,7 +311,7 @@ mod tests {
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use super::*;
use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider;
use crate::external_provider::MockExternalTempFileProvider;
async fn test_external_sorter(
current_memory_usage_threshold: Option<usize>,
@@ -332,7 +341,7 @@ mod tests {
move |index_name| {
assert_eq!(index_name, "test");
let mut files = files.lock().unwrap();
Ok(files.drain().map(|f| f.1).collect::<Vec<_>>())
Ok(files.drain().collect::<Vec<_>>())
}
});

View File

@@ -213,6 +213,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Intermediate error"))]
Intermediate {
source: crate::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -245,6 +252,7 @@ impl ErrorExt for Error {
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
Intermediate { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
}
}

View File

@@ -16,5 +16,7 @@
#![feature(assert_matches)]
pub mod bloom_filter;
pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;

View File

@@ -104,16 +104,28 @@ impl IntermediateLocation {
&self.files_dir
}
/// Returns the path of the directory for intermediate files associated with a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.files_dir, &format!("{column_id}/"))
/// Returns the path of the directory for intermediate files associated with the `file_group`:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/`
pub fn file_group_path(&self, file_group: &str) -> String {
util::join_path(&self.files_dir, &format!("{file_group}/"))
}
/// Returns the path of the intermediate file with the given id for a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im`
pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String {
util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im"))
/// Returns the path of the intermediate file with the given `file_group` and `im_file_id`:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im`
pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
util::join_path(
&self.file_group_path(file_group),
&format!("{im_file_id}.im"),
)
}
/// Returns the intermediate file id from the path.
pub fn im_file_id_from_path(&self, path: &str) -> String {
path.rsplit('/')
.next()
.and_then(|s| s.strip_suffix(".im"))
.unwrap_or_default()
.to_string()
}
}
@@ -161,17 +173,20 @@ mod tests {
let uuid = location.files_dir.split('/').nth(3).unwrap();
let column_id = "1";
let file_group = "1";
assert_eq!(
location.column_path(column_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/")
location.file_group_path(file_group),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
);
let im_file_id = "000000000010";
let file_path = location.file_path(file_group, im_file_id);
assert_eq!(
location.file_path(column_id, im_file_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im")
file_path,
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
);
assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
}
#[tokio::test]

View File

@@ -16,9 +16,9 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::warn;
use futures::{AsyncRead, AsyncWrite};
use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use index::inverted_index::error as index_error;
use index::inverted_index::error::Result as IndexResult;
use index::error as index_error;
use index::error::Result as IndexResult;
use index::external_provider::ExternalTempFileProvider;
use snafu::ResultExt;
use crate::error::Result;
@@ -42,10 +42,10 @@ pub(crate) struct TempFileProvider {
impl ExternalTempFileProvider for TempFileProvider {
async fn create(
&self,
column_id: &str,
file_group: &str,
file_id: &str,
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(column_id, file_id);
let path = self.location.file_path(file_group, file_id);
let writer = self
.manager
.store()
@@ -63,13 +63,13 @@ impl ExternalTempFileProvider for TempFileProvider {
async fn read_all(
&self,
column_id: &str,
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let column_path = self.location.column_path(column_id);
file_group: &str,
) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
let file_group_path = self.location.file_group_path(file_group);
let entries = self
.manager
.store()
.list(&column_path)
.list(&file_group_path)
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
@@ -81,6 +81,8 @@ impl ExternalTempFileProvider for TempFileProvider {
continue;
}
let im_file_id = self.location.im_file_id_from_path(entry.path());
let reader = self
.manager
.store()
@@ -93,7 +95,7 @@ impl ExternalTempFileProvider for TempFileProvider {
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
readers.push(Box::new(reader) as _);
readers.push((im_file_id, Box::new(reader) as _));
}
Ok(readers)
@@ -133,36 +135,36 @@ mod tests {
let store = IntermediateManager::init_fs(path).await.unwrap();
let provider = TempFileProvider::new(location.clone(), store);
let column_name = "tag0";
let file_group = "tag0";
let file_id = "0000000010";
let mut writer = provider.create(column_name, file_id).await.unwrap();
let mut writer = provider.create(file_group, file_id).await.unwrap();
writer.write_all(b"hello").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let file_id = "0000000100";
let mut writer = provider.create(column_name, file_id).await.unwrap();
let mut writer = provider.create(file_group, file_id).await.unwrap();
writer.write_all(b"world").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let column_name = "tag1";
let file_group = "tag1";
let file_id = "0000000010";
let mut writer = provider.create(column_name, file_id).await.unwrap();
let mut writer = provider.create(file_group, file_id).await.unwrap();
writer.write_all(b"foo").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let readers = provider.read_all("tag0").await.unwrap();
assert_eq!(readers.len(), 2);
for mut reader in readers {
for (_, mut reader) in readers {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert!(matches!(buf.as_slice(), b"hello" | b"world"));
}
let readers = provider.read_all("tag1").await.unwrap();
assert_eq!(readers.len(), 1);
let mut reader = readers.into_iter().next().unwrap();
let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"foo");