feat(index): measure memory usage in global instead of single-column and add metrics (#3383)

* feat(index): measure memory usage in global instead of single-column and add metrics

* feat: add leading zeros to streamline memory usage

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove println

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-02-28 14:49:24 +08:00
committed by GitHub
parent a8cbec824c
commit c3c80b92c8
9 changed files with 324 additions and 95 deletions

View File

@@ -134,7 +134,7 @@ create_on_compaction = "auto"
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

View File

@@ -16,6 +16,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::mem;
use std::num::NonZeroUsize;
use std::ops::RangeInclusive;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -48,6 +49,14 @@ pub struct ExternalSorter {
/// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded
values_buffer: BTreeMap<Bytes, BitVec>,
/// Count of rows in the last dumped buffer, used to streamline memory usage of `values_buffer`.
///
/// After data is dumped to external files, `last_dump_row_count` is updated to reflect the new starting point
/// for `BitVec` indexing. This means each `BitVec` in `values_buffer` thereafter encodes positions relative to
/// this count, not from 0. This mechanism effectively shrinks the memory footprint of each `BitVec`, helping manage
/// memory use more efficiently by focusing only on newly ingested data post-dump.
last_dump_row_count: usize,
/// Count of all rows ingested so far
total_row_count: usize,
@@ -58,9 +67,20 @@ pub struct ExternalSorter {
/// Tracks memory usage of the buffer
current_memory_usage: usize,
/// The memory usage threshold at which the buffer should be dumped to an external file.
/// `None` indicates that the buffer should never be dumped.
memory_usage_threshold: Option<usize>,
/// The threshold of current memory usage below which the buffer is not dumped, even if the global memory
/// usage exceeds `global_memory_usage_sort_limit`. This allows for smaller buffers to remain in memory,
/// providing a buffer against unnecessary dumps to external files, which can be costly in terms of performance.
/// `None` indicates that only the global memory usage threshold is considered for dumping the buffer.
current_memory_usage_threshold: Option<usize>,
/// Tracks the global memory usage of all sorters
global_memory_usage: Arc<AtomicUsize>,
/// The memory usage limit that, when exceeded by the global memory consumption of all sorters, necessitates
/// a reassessment of buffer retention. Surpassing this limit signals that there is a high overall memory pressure,
/// potentially requiring buffer dumping to external storage for memory relief.
/// `None` value indicates that no specific global memory usage threshold is established for triggering buffer dumps.
global_memory_usage_sort_limit: Option<usize>,
}
#[async_trait]
@@ -72,7 +92,7 @@ impl Sorter for ExternalSorter {
return Ok(());
}
let segment_index_range = self.segment_index_range(n);
let segment_index_range = self.segment_index_range(n, value.is_none());
self.total_row_count += n;
if let Some(value) = value {
@@ -92,8 +112,15 @@ impl Sorter for ExternalSorter {
// TODO(zhongzc): k-way merge instead of 2-way merge
let mut tree_nodes: VecDeque<SortedStream> = VecDeque::with_capacity(readers.len() + 1);
let leading_zeros = self.last_dump_row_count / self.segment_row_count;
tree_nodes.push_back(Box::new(stream::iter(
mem::take(&mut self.values_buffer).into_iter().map(Ok),
mem::take(&mut self.values_buffer)
.into_iter()
.map(move |(value, mut bitmap)| {
bitmap.resize(bitmap.len() + leading_zeros, false);
bitmap.shift_right(leading_zeros);
Ok((value, bitmap))
}),
)));
for reader in readers {
tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
@@ -121,7 +148,9 @@ impl ExternalSorter {
index_name: String,
temp_file_provider: Arc<dyn ExternalTempFileProvider>,
segment_row_count: NonZeroUsize,
memory_usage_threshold: Option<usize>,
current_memory_usage_threshold: Option<usize>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_sort_limit: Option<usize>,
) -> Self {
Self {
index_name,
@@ -131,24 +160,31 @@ impl ExternalSorter {
values_buffer: BTreeMap::new(),
total_row_count: 0,
last_dump_row_count: 0,
segment_row_count,
current_memory_usage: 0,
memory_usage_threshold,
current_memory_usage_threshold,
global_memory_usage,
global_memory_usage_sort_limit,
}
}
/// Generates a factory function that creates new `ExternalSorter` instances
pub fn factory(
temp_file_provider: Arc<dyn ExternalTempFileProvider>,
memory_usage_threshold: Option<usize>,
current_memory_usage_threshold: Option<usize>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_sort_limit: Option<usize>,
) -> SorterFactory {
Box::new(move |index_name, segment_row_count| {
Box::new(Self::new(
index_name,
temp_file_provider.clone(),
segment_row_count,
memory_usage_threshold,
current_memory_usage_threshold,
global_memory_usage.clone(),
global_memory_usage_sort_limit,
))
})
}
@@ -183,22 +219,41 @@ impl ExternalSorter {
/// Checks if the in-memory buffer exceeds the threshold and offloads it to external storage if necessary
async fn may_dump_buffer(&mut self, memory_diff: usize) -> Result<()> {
self.current_memory_usage += memory_diff;
if self.memory_usage_threshold.is_none()
|| self.current_memory_usage < self.memory_usage_threshold.unwrap()
let memory_usage = self.current_memory_usage;
self.global_memory_usage
.fetch_add(memory_diff, Ordering::Relaxed);
if self.global_memory_usage_sort_limit.is_none() {
return Ok(());
}
if self.global_memory_usage.load(Ordering::Relaxed)
< self.global_memory_usage_sort_limit.unwrap()
{
return Ok(());
}
if let Some(current_threshold) = self.current_memory_usage_threshold {
if memory_usage < current_threshold {
return Ok(());
}
}
let file_id = &format!("{:012}", self.total_row_count);
let index_name = &self.index_name;
let writer = self.temp_file_provider.create(index_name, file_id).await?;
let memory_usage = self.current_memory_usage;
let values = mem::take(&mut self.values_buffer);
self.global_memory_usage
.fetch_sub(memory_usage, Ordering::Relaxed);
self.current_memory_usage = 0;
let bitmap_leading_zeros = self.last_dump_row_count / self.segment_row_count;
self.last_dump_row_count =
self.total_row_count - self.total_row_count % self.segment_row_count; // align to segment
let entries = values.len();
IntermediateWriter::new(writer).write_all(values).await.inspect(|_|
IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_|
logging::debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
).inspect_err(|e|
logging::error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}")
@@ -206,10 +261,16 @@ impl ExternalSorter {
}
/// Determines the segment index range for the row index range
/// `[self.total_row_count, self.total_row_count + n - 1]`
fn segment_index_range(&self, n: usize) -> RangeInclusive<usize> {
let start = self.segment_index(self.total_row_count);
let end = self.segment_index(self.total_row_count + n - 1);
/// `[row_begin, row_begin + n - 1]`
fn segment_index_range(&self, n: usize, is_null: bool) -> RangeInclusive<usize> {
let row_begin = if is_null {
self.total_row_count
} else {
self.total_row_count - self.last_dump_row_count
};
let start = self.segment_index(row_begin);
let end = self.segment_index(row_begin + n - 1);
start..=end
}
@@ -244,7 +305,8 @@ mod tests {
use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider;
async fn test_external_sorter(
memory_usage_threshold: Option<usize>,
current_memory_usage_threshold: Option<usize>,
global_memory_usage_sort_limit: Option<usize>,
segment_row_count: usize,
row_count: usize,
batch_push: bool,
@@ -278,7 +340,9 @@ mod tests {
"test".to_owned(),
Arc::new(mock_provider),
NonZeroUsize::new(segment_row_count).unwrap(),
memory_usage_threshold,
current_memory_usage_threshold,
Arc::new(AtomicUsize::new(0)),
global_memory_usage_sort_limit,
);
let mut sorted_result = if batch_push {
@@ -321,7 +385,8 @@ mod tests {
#[tokio::test]
async fn test_external_sorter_pure_in_memory() {
let memory_usage_threshold = None;
let current_memory_usage_threshold = None;
let global_memory_usage_sort_limit = None;
let total_row_count_cases = vec![0, 100, 1000, 10000];
let segment_row_count_cases = vec![1, 10, 100, 1000];
let batch_push_cases = vec![false, true];
@@ -330,7 +395,8 @@ mod tests {
for segment_row_count in &segment_row_count_cases {
for batch_push in &batch_push_cases {
test_external_sorter(
memory_usage_threshold,
current_memory_usage_threshold,
global_memory_usage_sort_limit,
*segment_row_count,
total_row_count,
*batch_push,
@@ -343,7 +409,8 @@ mod tests {
#[tokio::test]
async fn test_external_sorter_pure_external() {
let memory_usage_threshold = Some(0);
let current_memory_usage_threshold = None;
let global_memory_usage_sort_limit = Some(0);
let total_row_count_cases = vec![0, 100, 1000, 10000];
let segment_row_count_cases = vec![1, 10, 100, 1000];
let batch_push_cases = vec![false, true];
@@ -352,7 +419,8 @@ mod tests {
for segment_row_count in &segment_row_count_cases {
for batch_push in &batch_push_cases {
test_external_sorter(
memory_usage_threshold,
current_memory_usage_threshold,
global_memory_usage_sort_limit,
*segment_row_count,
total_row_count,
*batch_push,
@@ -365,7 +433,8 @@ mod tests {
#[tokio::test]
async fn test_external_sorter_mixed() {
let memory_usage_threshold = Some(1024);
let current_memory_usage_threshold = vec![None, Some(2048)];
let global_memory_usage_sort_limit = Some(1024);
let total_row_count_cases = vec![0, 100, 1000, 10000];
let segment_row_count_cases = vec![1, 10, 100, 1000];
let batch_push_cases = vec![false, true];
@@ -373,13 +442,16 @@ mod tests {
for total_row_count in total_row_count_cases {
for segment_row_count in &segment_row_count_cases {
for batch_push in &batch_push_cases {
test_external_sorter(
memory_usage_threshold,
*segment_row_count,
total_row_count,
*batch_push,
)
.await;
for current_memory_usage_threshold in &current_memory_usage_threshold {
test_external_sorter(
*current_memory_usage_threshold,
global_memory_usage_sort_limit,
*segment_row_count,
total_row_count,
*batch_push,
)
.await;
}
}
}
}

View File

@@ -12,6 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Intermediate codec for external sorting.
//!
//! This module provides serialization and deserialization logic for
//! handling intermediate data during the sorting process.
//! The serialization format is as follows:
//!
//! ```text
//! [magic][bitmap leading zeros][item][item]...[item]
//! [4] [4] [?]
//!
//! Each [item] is structured as:
//! [value len][value][bitmap len][bitmap]
//! [8] [?] [8] [?]
//! ```
//!
//! The format starts with a 4-byte magic identifier, followed by a 4-byte
//! bitmap leading zeros count, indicating how many leading zeros are in the
//! fixed-size region of the bitmap. Following that, each item represents
//! a value and its associated bitmap, serialized with their lengths for
//! easier deserialization.
mod codec_v1;
use std::collections::BTreeMap;
@@ -39,14 +60,26 @@ impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
}
/// Serializes and writes all provided values to the wrapped writer
pub async fn write_all(mut self, values: BTreeMap<Bytes, BitVec>) -> Result<()> {
let (codec_magic, encoder) = (codec_v1::CODEC_V1_MAGIC, codec_v1::IntermediateCodecV1);
pub async fn write_all(
mut self,
values: BTreeMap<Bytes, BitVec>,
bitmap_leading_zeros: u32,
) -> Result<()> {
let (codec_magic, encoder) = (
codec_v1::CODEC_V1_MAGIC,
codec_v1::IntermediateItemEncoderV1,
);
self.writer
.write_all(codec_magic)
.await
.context(WriteSnafu)?;
self.writer
.write_all(&bitmap_leading_zeros.to_be_bytes())
.await
.context(WriteSnafu)?;
let value_stream = stream::iter(values.into_iter().map(Ok));
let frame_write = FramedWrite::new(&mut self.writer, encoder);
// `forward()` will flush and close the writer when the stream ends
@@ -79,7 +112,17 @@ impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
.context(ReadSnafu)?;
let decoder = match &magic {
codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateCodecV1,
codec_v1::CODEC_V1_MAGIC => {
let bitmap_leading_zeros = {
let mut buf = [0u8; 4];
self.reader.read_exact(&mut buf).await.context(ReadSnafu)?;
u32::from_be_bytes(buf)
};
codec_v1::IntermediateItemDecoderV1 {
bitmap_leading_zeros,
}
}
_ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(),
};
@@ -110,7 +153,7 @@ mod tests {
]);
let writer = IntermediateWriter::new(buf_w);
writer.write_all(values.clone()).await.unwrap();
writer.write_all(values.clone(), 0).await.unwrap();
// reset the handle
buf_r.seek(SeekFrom::Start(0)).unwrap();
@@ -124,6 +167,45 @@ mod tests {
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_read_write_with_prefix_zeros() {
let file_r = tempfile().unwrap();
let file_w = file_r.try_clone().unwrap();
let mut buf_r = AllowStdIo::new(file_r);
let buf_w = AllowStdIo::new(file_w);
let values = BTreeMap::from_iter([
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
]);
let writer = IntermediateWriter::new(buf_w);
writer.write_all(values.clone(), 8).await.unwrap();
// reset the handle
buf_r.seek(SeekFrom::Start(0)).unwrap();
let reader = IntermediateReader::new(buf_r);
let mut stream = reader.into_stream().await.unwrap();
let a = stream.next().await.unwrap().unwrap();
assert_eq!(
a,
(
Bytes::from("a"),
BitVec::from_slice(&[0b00000000, 0b10101010])
)
);
let b = stream.next().await.unwrap().unwrap();
assert_eq!(
b,
(
Bytes::from("b"),
BitVec::from_slice(&[0b00000000, 0b01010101])
)
);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_read_write_empty() {
let mut buf = vec![];
@@ -131,7 +213,7 @@ mod tests {
let values = BTreeMap::new();
let writer = IntermediateWriter::new(&mut buf);
writer.write_all(values.clone()).await.unwrap();
writer.write_all(values.clone(), 0).await.unwrap();
let reader = IntermediateReader::new(Cursor::new(buf));
let mut stream = reader.into_stream().await.unwrap();

View File

@@ -27,17 +27,11 @@ const U64_LENGTH: usize = std::mem::size_of::<u64>();
/// Magic bytes for this intermediate codec version
pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01";
/// Codec for serializing and deserializing intermediate data for external sorting.
///
/// Binary format serialization. The item is laid out as follows:
/// ```text
/// [value len][value][bitmap len][bitmap]
/// [8] [?] [8] [?]
/// ```
pub struct IntermediateCodecV1;
/// Serializes items of external sorting intermediate files.
pub struct IntermediateItemEncoderV1;
/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented.
impl Encoder for IntermediateCodecV1 {
impl Encoder for IntermediateItemEncoderV1 {
type Item<'a> = (Bytes, BitVec);
type Error = Error;
@@ -54,8 +48,13 @@ impl Encoder for IntermediateCodecV1 {
}
}
/// Deserializes items of external sorting intermediate files.
pub struct IntermediateItemDecoderV1 {
pub(crate) bitmap_leading_zeros: u32,
}
/// [`FramedRead`] requires the [`Decoder`] trait to be implemented.
impl Decoder for IntermediateCodecV1 {
impl Decoder for IntermediateItemDecoderV1 {
type Item = (Bytes, BitVec);
type Error = Error;
@@ -92,9 +91,11 @@ impl Decoder for IntermediateCodecV1 {
if buf.len() < bitmap_len {
return Ok(None);
}
let bitmap_bytes = &buf[..bitmap_len];
let item = (value_bytes.to_vec(), BitVec::from_slice(bitmap_bytes));
let mut bitmap = BitVec::repeat(false, self.bitmap_leading_zeros as _);
bitmap.extend_from_raw_slice(&buf[..bitmap_len]);
let item = (value_bytes.to_vec(), bitmap);
src.advance(U64_LENGTH * 2 + value_len + bitmap_len);
Ok(Some(item))
@@ -113,54 +114,88 @@ impl From<io::Error> for Error {
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::{bitvec, Lsb0};
use super::*;
#[test]
fn test_intermediate_codec_basic() {
let mut codec = IntermediateCodecV1;
let mut encoder = IntermediateItemEncoderV1;
let mut buf = BytesMut::new();
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
codec.encode(item.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
encoder.encode(item.clone(), &mut buf).unwrap();
let mut decoder = IntermediateItemDecoderV1 {
bitmap_leading_zeros: 0,
};
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101]));
codec.encode(item.clone(), &mut buf).unwrap();
codec.encode(item1.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item1);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
encoder.encode(item.clone(), &mut buf).unwrap();
encoder.encode(item1.clone(), &mut buf).unwrap();
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item1);
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
assert!(buf.is_empty());
}
#[test]
fn test_intermediate_codec_empty_item() {
let mut codec = IntermediateCodecV1;
let mut encoder = IntermediateItemEncoderV1;
let mut buf = BytesMut::new();
let item = (b"".to_vec(), BitVec::from_slice(&[]));
codec.encode(item.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
encoder.encode(item.clone(), &mut buf).unwrap();
let mut decoder = IntermediateItemDecoderV1 {
bitmap_leading_zeros: 0,
};
assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
assert!(buf.is_empty());
}
#[test]
fn test_intermediate_codec_partial() {
let mut codec = IntermediateCodecV1;
let mut encoder = IntermediateItemEncoderV1;
let mut buf = BytesMut::new();
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
codec.encode(item.clone(), &mut buf).unwrap();
encoder.encode(item.clone(), &mut buf).unwrap();
let partial_length = U64_LENGTH + 3;
let mut partial_bytes = buf.split_to(partial_length);
assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); // not enough data
let mut decoder = IntermediateItemDecoderV1 {
bitmap_leading_zeros: 0,
};
assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); // not enough data
partial_bytes.extend_from_slice(&buf[..]);
assert_eq!(codec.decode(&mut partial_bytes).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None);
assert_eq!(decoder.decode(&mut partial_bytes).unwrap().unwrap(), item);
assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None);
assert!(partial_bytes.is_empty());
}
#[test]
fn test_intermediate_codec_prefix_zeros() {
let mut encoder = IntermediateItemEncoderV1;
let mut buf = BytesMut::new();
let item = (b"hello".to_vec(), bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]);
encoder.encode(item.clone(), &mut buf).unwrap();
let mut decoder = IntermediateItemDecoderV1 {
bitmap_leading_zeros: 3,
};
let decoded_item = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(decoded_item.0, b"hello");
assert_eq!(
decoded_item.1,
bitvec![u8, Lsb0; 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0]
);
assert_eq!(decoder.decode(&mut buf).unwrap(), None);
assert!(buf.is_empty());
}
}

View File

@@ -186,7 +186,8 @@ lazy_static! {
pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL]
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
/// Counter of rows indexed.
@@ -201,7 +202,11 @@ lazy_static! {
"index create bytes total",
)
.unwrap();
/// Gauge of index create memory usage.
pub static ref INDEX_CREATE_MEMORY_USAGE: IntGauge = register_int_gauge!(
"greptime_index_create_memory_usage",
"index create memory usage",
).unwrap();
/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_bytes_total",

View File

@@ -26,6 +26,7 @@ use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
@@ -33,19 +34,13 @@ use crate::sst::index::intermediate::IntermediateManager;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
// TODO(zhongzc): how to determine this value?
/// The minimum memory usage threshold for a column to qualify for external sorting during index creation.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192;
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
/// The index creator that hides the error handling details.
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
region_id: RegionId,
inner: Option<SstIndexCreator>,
last_memory_usage: usize,
}
impl Indexer {
@@ -69,6 +64,15 @@ impl Indexer {
self.inner = None;
}
}
if let Some(creator) = self.inner.as_ref() {
let memory_usage = creator.memory_usage();
INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64);
self.last_memory_usage = memory_usage;
} else {
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
}
}
/// Finish the index creation.
@@ -81,6 +85,9 @@ impl Indexer {
"Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
return Some(byte_count);
}
Err(err) => {
@@ -99,6 +106,8 @@ impl Indexer {
}
}
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
None
}
@@ -119,6 +128,8 @@ impl Indexer {
}
}
}
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
}
}
@@ -201,6 +212,7 @@ impl<'a> IndexerBuilder<'a> {
file_id: self.file_id,
region_id: self.metadata.region_id,
inner: Some(creator),
last_memory_usage: 0,
}
}
}

View File

@@ -17,6 +17,7 @@ mod temp_provider;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::warn;
@@ -45,9 +46,13 @@ use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager};
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
use crate::sst::index::INDEX_BLOB_TYPE;
/// The minimum memory usage threshold for one column.
const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
type ByteCount = usize;
type RowCount = usize;
@@ -75,6 +80,9 @@ pub struct SstIndexCreator {
/// Ignore column IDs for index creation.
ignore_column_ids: HashSet<ColumnId>,
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,
}
impl SstIndexCreator {
@@ -89,16 +97,19 @@ impl SstIndexCreator {
memory_usage_threshold: Option<usize>,
segment_row_count: NonZeroUsize,
) -> Self {
// `memory_usage_threshold` is the total memory usage threshold of the index creation,
// so we need to divide it by the number of columns
let memory_threshold = memory_usage_threshold.map(|threshold| {
(threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD)
});
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
intermediate_manager,
));
let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold);
let memory_usage = Arc::new(AtomicUsize::new(0));
let sorter = ExternalSorter::factory(
temp_file_provider.clone() as _,
Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
memory_usage.clone(),
memory_usage_threshold,
);
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
@@ -115,6 +126,7 @@ impl SstIndexCreator {
aborted: false,
ignore_column_ids: HashSet::default(),
memory_usage,
}
}
@@ -294,6 +306,10 @@ impl SstIndexCreator {
self.temp_file_provider.cleanup().await
}
pub fn memory_usage(&self) -> usize {
self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[cfg(test)]

View File

@@ -110,7 +110,7 @@ impl TempFileProvider {
pub async fn cleanup(&self) -> Result<()> {
self.manager
.store()
.remove_all(self.location.root_path())
.remove_all(self.location.dir_to_cleanup())
.await
}
}
@@ -172,7 +172,7 @@ mod tests {
assert!(provider
.manager
.store()
.list(location.root_path())
.list(location.dir_to_cleanup())
.await
.unwrap()
.is_empty());

View File

@@ -68,7 +68,8 @@ impl IntermediateManager {
/// during external sorting.
#[derive(Debug, Clone)]
pub struct IntermediateLocation {
root_path: String,
files_dir: String,
sst_dir: String,
}
impl IntermediateLocation {
@@ -80,19 +81,20 @@ impl IntermediateLocation {
let region_id = region_id.as_u64();
let uuid = Uuid::new_v4();
Self {
root_path: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
sst_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/"),
}
}
/// Returns the root directory of the intermediate files
pub fn root_path(&self) -> &str {
&self.root_path
/// Returns the directory to clean up when the sorting is done
pub fn dir_to_cleanup(&self) -> &str {
&self.sst_dir
}
/// Returns the path of the directory for intermediate files associated with a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.root_path, &format!("{column_id}/"))
util::join_path(&self.files_dir, &format!("{column_id}/"))
}
/// Returns the path of the intermediate file with the given id for a column:
@@ -135,14 +137,19 @@ mod tests {
let sst_file_id = FileId::random();
let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
assert_eq!(
location.dir_to_cleanup(),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/")
);
let re = Regex::new(&format!(
"{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
))
.unwrap();
assert!(re.is_match(location.root_path()));
assert!(re.is_match(&location.files_dir));
let uuid = location.root_path().split('/').nth(3).unwrap();
let uuid = location.files_dir.split('/').nth(3).unwrap();
let column_id = "1";
assert_eq!(