diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0c55993111..3aa7eb4334 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -134,7 +134,7 @@ create_on_compaction = "auto" apply_on_query = "auto" # Memory threshold for performing an external sort during index creation. # Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. -mem_threshold_on_create = "64MB" +mem_threshold_on_create = "64M" # File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). intermediate_path = "" diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index e43b65bfce..cea2820c75 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::mem; use std::num::NonZeroUsize; use std::ops::RangeInclusive; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -48,6 +49,14 @@ pub struct ExternalSorter { /// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded values_buffer: BTreeMap, + /// Count of rows in the last dumped buffer, used to streamline memory usage of `values_buffer`. + /// + /// After data is dumped to external files, `last_dump_row_count` is updated to reflect the new starting point + /// for `BitVec` indexing. This means each `BitVec` in `values_buffer` thereafter encodes positions relative to + /// this count, not from 0. This mechanism effectively shrinks the memory footprint of each `BitVec`, helping manage + /// memory use more efficiently by focusing only on newly ingested data post-dump. + last_dump_row_count: usize, + /// Count of all rows ingested so far total_row_count: usize, @@ -58,9 +67,20 @@ pub struct ExternalSorter { /// Tracks memory usage of the buffer current_memory_usage: usize, - /// The memory usage threshold at which the buffer should be dumped to an external file. - /// `None` indicates that the buffer should never be dumped. - memory_usage_threshold: Option, + /// The threshold of current memory usage below which the buffer is not dumped, even if the global memory + /// usage exceeds `global_memory_usage_sort_limit`. This allows for smaller buffers to remain in memory, + /// providing a buffer against unnecessary dumps to external files, which can be costly in terms of performance. + /// `None` indicates that only the global memory usage threshold is considered for dumping the buffer. + current_memory_usage_threshold: Option, + + /// Tracks the global memory usage of all sorters + global_memory_usage: Arc, + + /// The memory usage limit that, when exceeded by the global memory consumption of all sorters, necessitates + /// a reassessment of buffer retention. Surpassing this limit signals that there is a high overall memory pressure, + /// potentially requiring buffer dumping to external storage for memory relief. + /// `None` value indicates that no specific global memory usage threshold is established for triggering buffer dumps. + global_memory_usage_sort_limit: Option, } #[async_trait] @@ -72,7 +92,7 @@ impl Sorter for ExternalSorter { return Ok(()); } - let segment_index_range = self.segment_index_range(n); + let segment_index_range = self.segment_index_range(n, value.is_none()); self.total_row_count += n; if let Some(value) = value { @@ -92,8 +112,15 @@ impl Sorter for ExternalSorter { // TODO(zhongzc): k-way merge instead of 2-way merge let mut tree_nodes: VecDeque = VecDeque::with_capacity(readers.len() + 1); + let leading_zeros = self.last_dump_row_count / self.segment_row_count; tree_nodes.push_back(Box::new(stream::iter( - mem::take(&mut self.values_buffer).into_iter().map(Ok), + mem::take(&mut self.values_buffer) + .into_iter() + .map(move |(value, mut bitmap)| { + bitmap.resize(bitmap.len() + leading_zeros, false); + bitmap.shift_right(leading_zeros); + Ok((value, bitmap)) + }), ))); for reader in readers { tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?); @@ -121,7 +148,9 @@ impl ExternalSorter { index_name: String, temp_file_provider: Arc, segment_row_count: NonZeroUsize, - memory_usage_threshold: Option, + current_memory_usage_threshold: Option, + global_memory_usage: Arc, + global_memory_usage_sort_limit: Option, ) -> Self { Self { index_name, @@ -131,24 +160,31 @@ impl ExternalSorter { values_buffer: BTreeMap::new(), total_row_count: 0, + last_dump_row_count: 0, segment_row_count, current_memory_usage: 0, - memory_usage_threshold, + current_memory_usage_threshold, + global_memory_usage, + global_memory_usage_sort_limit, } } /// Generates a factory function that creates new `ExternalSorter` instances pub fn factory( temp_file_provider: Arc, - memory_usage_threshold: Option, + current_memory_usage_threshold: Option, + global_memory_usage: Arc, + global_memory_usage_sort_limit: Option, ) -> SorterFactory { Box::new(move |index_name, segment_row_count| { Box::new(Self::new( index_name, temp_file_provider.clone(), segment_row_count, - memory_usage_threshold, + current_memory_usage_threshold, + global_memory_usage.clone(), + global_memory_usage_sort_limit, )) }) } @@ -183,22 +219,41 @@ impl ExternalSorter { /// Checks if the in-memory buffer exceeds the threshold and offloads it to external storage if necessary async fn may_dump_buffer(&mut self, memory_diff: usize) -> Result<()> { self.current_memory_usage += memory_diff; - if self.memory_usage_threshold.is_none() - || self.current_memory_usage < self.memory_usage_threshold.unwrap() + let memory_usage = self.current_memory_usage; + self.global_memory_usage + .fetch_add(memory_diff, Ordering::Relaxed); + + if self.global_memory_usage_sort_limit.is_none() { + return Ok(()); + } + + if self.global_memory_usage.load(Ordering::Relaxed) + < self.global_memory_usage_sort_limit.unwrap() { return Ok(()); } + if let Some(current_threshold) = self.current_memory_usage_threshold { + if memory_usage < current_threshold { + return Ok(()); + } + } + let file_id = &format!("{:012}", self.total_row_count); let index_name = &self.index_name; let writer = self.temp_file_provider.create(index_name, file_id).await?; - let memory_usage = self.current_memory_usage; let values = mem::take(&mut self.values_buffer); + self.global_memory_usage + .fetch_sub(memory_usage, Ordering::Relaxed); self.current_memory_usage = 0; + let bitmap_leading_zeros = self.last_dump_row_count / self.segment_row_count; + self.last_dump_row_count = + self.total_row_count - self.total_row_count % self.segment_row_count; // align to segment + let entries = values.len(); - IntermediateWriter::new(writer).write_all(values).await.inspect(|_| + IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_| logging::debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}") ).inspect_err(|e| logging::error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}") @@ -206,10 +261,16 @@ impl ExternalSorter { } /// Determines the segment index range for the row index range - /// `[self.total_row_count, self.total_row_count + n - 1]` - fn segment_index_range(&self, n: usize) -> RangeInclusive { - let start = self.segment_index(self.total_row_count); - let end = self.segment_index(self.total_row_count + n - 1); + /// `[row_begin, row_begin + n - 1]` + fn segment_index_range(&self, n: usize, is_null: bool) -> RangeInclusive { + let row_begin = if is_null { + self.total_row_count + } else { + self.total_row_count - self.last_dump_row_count + }; + + let start = self.segment_index(row_begin); + let end = self.segment_index(row_begin + n - 1); start..=end } @@ -244,7 +305,8 @@ mod tests { use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider; async fn test_external_sorter( - memory_usage_threshold: Option, + current_memory_usage_threshold: Option, + global_memory_usage_sort_limit: Option, segment_row_count: usize, row_count: usize, batch_push: bool, @@ -278,7 +340,9 @@ mod tests { "test".to_owned(), Arc::new(mock_provider), NonZeroUsize::new(segment_row_count).unwrap(), - memory_usage_threshold, + current_memory_usage_threshold, + Arc::new(AtomicUsize::new(0)), + global_memory_usage_sort_limit, ); let mut sorted_result = if batch_push { @@ -321,7 +385,8 @@ mod tests { #[tokio::test] async fn test_external_sorter_pure_in_memory() { - let memory_usage_threshold = None; + let current_memory_usage_threshold = None; + let global_memory_usage_sort_limit = None; let total_row_count_cases = vec![0, 100, 1000, 10000]; let segment_row_count_cases = vec![1, 10, 100, 1000]; let batch_push_cases = vec![false, true]; @@ -330,7 +395,8 @@ mod tests { for segment_row_count in &segment_row_count_cases { for batch_push in &batch_push_cases { test_external_sorter( - memory_usage_threshold, + current_memory_usage_threshold, + global_memory_usage_sort_limit, *segment_row_count, total_row_count, *batch_push, @@ -343,7 +409,8 @@ mod tests { #[tokio::test] async fn test_external_sorter_pure_external() { - let memory_usage_threshold = Some(0); + let current_memory_usage_threshold = None; + let global_memory_usage_sort_limit = Some(0); let total_row_count_cases = vec![0, 100, 1000, 10000]; let segment_row_count_cases = vec![1, 10, 100, 1000]; let batch_push_cases = vec![false, true]; @@ -352,7 +419,8 @@ mod tests { for segment_row_count in &segment_row_count_cases { for batch_push in &batch_push_cases { test_external_sorter( - memory_usage_threshold, + current_memory_usage_threshold, + global_memory_usage_sort_limit, *segment_row_count, total_row_count, *batch_push, @@ -365,7 +433,8 @@ mod tests { #[tokio::test] async fn test_external_sorter_mixed() { - let memory_usage_threshold = Some(1024); + let current_memory_usage_threshold = vec![None, Some(2048)]; + let global_memory_usage_sort_limit = Some(1024); let total_row_count_cases = vec![0, 100, 1000, 10000]; let segment_row_count_cases = vec![1, 10, 100, 1000]; let batch_push_cases = vec![false, true]; @@ -373,13 +442,16 @@ mod tests { for total_row_count in total_row_count_cases { for segment_row_count in &segment_row_count_cases { for batch_push in &batch_push_cases { - test_external_sorter( - memory_usage_threshold, - *segment_row_count, - total_row_count, - *batch_push, - ) - .await; + for current_memory_usage_threshold in ¤t_memory_usage_threshold { + test_external_sorter( + *current_memory_usage_threshold, + global_memory_usage_sort_limit, + *segment_row_count, + total_row_count, + *batch_push, + ) + .await; + } } } } diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index 3a4f15c0de..dbadd5498b 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -12,6 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Intermediate codec for external sorting. +//! +//! This module provides serialization and deserialization logic for +//! handling intermediate data during the sorting process. +//! The serialization format is as follows: +//! +//! ```text +//! [magic][bitmap leading zeros][item][item]...[item] +//! [4] [4] [?] +//! +//! Each [item] is structured as: +//! [value len][value][bitmap len][bitmap] +//! [8] [?] [8] [?] +//! ``` +//! +//! The format starts with a 4-byte magic identifier, followed by a 4-byte +//! bitmap leading zeros count, indicating how many leading zeros are in the +//! fixed-size region of the bitmap. Following that, each item represents +//! a value and its associated bitmap, serialized with their lengths for +//! easier deserialization. + mod codec_v1; use std::collections::BTreeMap; @@ -39,14 +60,26 @@ impl IntermediateWriter { } /// Serializes and writes all provided values to the wrapped writer - pub async fn write_all(mut self, values: BTreeMap) -> Result<()> { - let (codec_magic, encoder) = (codec_v1::CODEC_V1_MAGIC, codec_v1::IntermediateCodecV1); + pub async fn write_all( + mut self, + values: BTreeMap, + bitmap_leading_zeros: u32, + ) -> Result<()> { + let (codec_magic, encoder) = ( + codec_v1::CODEC_V1_MAGIC, + codec_v1::IntermediateItemEncoderV1, + ); self.writer .write_all(codec_magic) .await .context(WriteSnafu)?; + self.writer + .write_all(&bitmap_leading_zeros.to_be_bytes()) + .await + .context(WriteSnafu)?; + let value_stream = stream::iter(values.into_iter().map(Ok)); let frame_write = FramedWrite::new(&mut self.writer, encoder); // `forward()` will flush and close the writer when the stream ends @@ -79,7 +112,17 @@ impl IntermediateReader { .context(ReadSnafu)?; let decoder = match &magic { - codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateCodecV1, + codec_v1::CODEC_V1_MAGIC => { + let bitmap_leading_zeros = { + let mut buf = [0u8; 4]; + self.reader.read_exact(&mut buf).await.context(ReadSnafu)?; + u32::from_be_bytes(buf) + }; + + codec_v1::IntermediateItemDecoderV1 { + bitmap_leading_zeros, + } + } _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(), }; @@ -110,7 +153,7 @@ mod tests { ]); let writer = IntermediateWriter::new(buf_w); - writer.write_all(values.clone()).await.unwrap(); + writer.write_all(values.clone(), 0).await.unwrap(); // reset the handle buf_r.seek(SeekFrom::Start(0)).unwrap(); @@ -124,6 +167,45 @@ mod tests { assert!(stream.next().await.is_none()); } + #[tokio::test] + async fn test_intermediate_read_write_with_prefix_zeros() { + let file_r = tempfile().unwrap(); + let file_w = file_r.try_clone().unwrap(); + let mut buf_r = AllowStdIo::new(file_r); + let buf_w = AllowStdIo::new(file_w); + + let values = BTreeMap::from_iter([ + (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), + ]); + + let writer = IntermediateWriter::new(buf_w); + writer.write_all(values.clone(), 8).await.unwrap(); + // reset the handle + buf_r.seek(SeekFrom::Start(0)).unwrap(); + + let reader = IntermediateReader::new(buf_r); + let mut stream = reader.into_stream().await.unwrap(); + + let a = stream.next().await.unwrap().unwrap(); + assert_eq!( + a, + ( + Bytes::from("a"), + BitVec::from_slice(&[0b00000000, 0b10101010]) + ) + ); + let b = stream.next().await.unwrap().unwrap(); + assert_eq!( + b, + ( + Bytes::from("b"), + BitVec::from_slice(&[0b00000000, 0b01010101]) + ) + ); + assert!(stream.next().await.is_none()); + } + #[tokio::test] async fn test_intermediate_read_write_empty() { let mut buf = vec![]; @@ -131,7 +213,7 @@ mod tests { let values = BTreeMap::new(); let writer = IntermediateWriter::new(&mut buf); - writer.write_all(values.clone()).await.unwrap(); + writer.write_all(values.clone(), 0).await.unwrap(); let reader = IntermediateReader::new(Cursor::new(buf)); let mut stream = reader.into_stream().await.unwrap(); diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index 14d1dcb9d3..bb1781a743 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -27,17 +27,11 @@ const U64_LENGTH: usize = std::mem::size_of::(); /// Magic bytes for this intermediate codec version pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01"; -/// Codec for serializing and deserializing intermediate data for external sorting. -/// -/// Binary format serialization. The item is laid out as follows: -/// ```text -/// [value len][value][bitmap len][bitmap] -/// [8] [?] [8] [?] -/// ``` -pub struct IntermediateCodecV1; +/// Serializes items of external sorting intermediate files. +pub struct IntermediateItemEncoderV1; /// [`FramedWrite`] requires the [`Encoder`] trait to be implemented. -impl Encoder for IntermediateCodecV1 { +impl Encoder for IntermediateItemEncoderV1 { type Item<'a> = (Bytes, BitVec); type Error = Error; @@ -54,8 +48,13 @@ impl Encoder for IntermediateCodecV1 { } } +/// Deserializes items of external sorting intermediate files. +pub struct IntermediateItemDecoderV1 { + pub(crate) bitmap_leading_zeros: u32, +} + /// [`FramedRead`] requires the [`Decoder`] trait to be implemented. -impl Decoder for IntermediateCodecV1 { +impl Decoder for IntermediateItemDecoderV1 { type Item = (Bytes, BitVec); type Error = Error; @@ -92,9 +91,11 @@ impl Decoder for IntermediateCodecV1 { if buf.len() < bitmap_len { return Ok(None); } - let bitmap_bytes = &buf[..bitmap_len]; - let item = (value_bytes.to_vec(), BitVec::from_slice(bitmap_bytes)); + let mut bitmap = BitVec::repeat(false, self.bitmap_leading_zeros as _); + bitmap.extend_from_raw_slice(&buf[..bitmap_len]); + + let item = (value_bytes.to_vec(), bitmap); src.advance(U64_LENGTH * 2 + value_len + bitmap_len); Ok(Some(item)) @@ -113,54 +114,88 @@ impl From for Error { #[cfg(test)] mod tests { + use common_base::bit_vec::prelude::{bitvec, Lsb0}; + use super::*; #[test] fn test_intermediate_codec_basic() { - let mut codec = IntermediateCodecV1; + let mut encoder = IntermediateItemEncoderV1; let mut buf = BytesMut::new(); let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); - codec.encode(item.clone(), &mut buf).unwrap(); - assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); - assert_eq!(codec.decode(&mut buf).unwrap(), None); + encoder.encode(item.clone(), &mut buf).unwrap(); + + let mut decoder = IntermediateItemDecoderV1 { + bitmap_leading_zeros: 0, + }; + assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(decoder.decode(&mut buf).unwrap(), None); let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101])); - codec.encode(item.clone(), &mut buf).unwrap(); - codec.encode(item1.clone(), &mut buf).unwrap(); - assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); - assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item1); - assert_eq!(codec.decode(&mut buf).unwrap(), None); + encoder.encode(item.clone(), &mut buf).unwrap(); + encoder.encode(item1.clone(), &mut buf).unwrap(); + assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item1); + assert_eq!(decoder.decode(&mut buf).unwrap(), None); assert!(buf.is_empty()); } #[test] fn test_intermediate_codec_empty_item() { - let mut codec = IntermediateCodecV1; + let mut encoder = IntermediateItemEncoderV1; let mut buf = BytesMut::new(); let item = (b"".to_vec(), BitVec::from_slice(&[])); - codec.encode(item.clone(), &mut buf).unwrap(); - assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); - assert_eq!(codec.decode(&mut buf).unwrap(), None); + encoder.encode(item.clone(), &mut buf).unwrap(); + + let mut decoder = IntermediateItemDecoderV1 { + bitmap_leading_zeros: 0, + }; + assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(decoder.decode(&mut buf).unwrap(), None); assert!(buf.is_empty()); } #[test] fn test_intermediate_codec_partial() { - let mut codec = IntermediateCodecV1; + let mut encoder = IntermediateItemEncoderV1; let mut buf = BytesMut::new(); let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); - codec.encode(item.clone(), &mut buf).unwrap(); + encoder.encode(item.clone(), &mut buf).unwrap(); let partial_length = U64_LENGTH + 3; let mut partial_bytes = buf.split_to(partial_length); - assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); // not enough data + let mut decoder = IntermediateItemDecoderV1 { + bitmap_leading_zeros: 0, + }; + assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); // not enough data partial_bytes.extend_from_slice(&buf[..]); - assert_eq!(codec.decode(&mut partial_bytes).unwrap().unwrap(), item); - assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); + assert_eq!(decoder.decode(&mut partial_bytes).unwrap().unwrap(), item); + assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); assert!(partial_bytes.is_empty()); } + + #[test] + fn test_intermediate_codec_prefix_zeros() { + let mut encoder = IntermediateItemEncoderV1; + let mut buf = BytesMut::new(); + + let item = (b"hello".to_vec(), bitvec![u8, Lsb0; 1, 0, 1, 0, 1, 0, 1, 0]); + encoder.encode(item.clone(), &mut buf).unwrap(); + + let mut decoder = IntermediateItemDecoderV1 { + bitmap_leading_zeros: 3, + }; + let decoded_item = decoder.decode(&mut buf).unwrap().unwrap(); + assert_eq!(decoded_item.0, b"hello"); + assert_eq!( + decoded_item.1, + bitvec![u8, Lsb0; 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0] + ); + assert_eq!(decoder.decode(&mut buf).unwrap(), None); + assert!(buf.is_empty()); + } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 83d8f87016..1c345d055c 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -186,7 +186,8 @@ lazy_static! { pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_index_create_elapsed", "index create elapsed", - &[STAGE_LABEL] + &[STAGE_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); /// Counter of rows indexed. @@ -201,7 +202,11 @@ lazy_static! { "index create bytes total", ) .unwrap(); - + /// Gauge of index create memory usage. + pub static ref INDEX_CREATE_MEMORY_USAGE: IntGauge = register_int_gauge!( + "greptime_index_create_memory_usage", + "index create memory usage", + ).unwrap(); /// Counter of r/w bytes on index related IO operations. pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( "greptime_index_io_bytes_total", diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 6a1b0a6e71..cb10e7fc91 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -26,6 +26,7 @@ use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; +use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; use crate::sst::file::FileId; @@ -33,19 +34,13 @@ use crate::sst::index::intermediate::IntermediateManager; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; -// TODO(zhongzc): how to determine this value? -/// The minimum memory usage threshold for a column to qualify for external sorting during index creation. -const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192; - -/// The buffer size for the pipe used to send index data to the puffin blob. -const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; - /// The index creator that hides the error handling details. #[derive(Default)] pub struct Indexer { file_id: FileId, region_id: RegionId, inner: Option, + last_memory_usage: usize, } impl Indexer { @@ -69,6 +64,15 @@ impl Indexer { self.inner = None; } } + + if let Some(creator) = self.inner.as_ref() { + let memory_usage = creator.memory_usage(); + INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64); + self.last_memory_usage = memory_usage; + } else { + INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); + self.last_memory_usage = 0; + } } /// Finish the index creation. @@ -81,6 +85,9 @@ impl Indexer { "Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}", self.region_id, self.file_id, byte_count, row_count ); + + INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); + self.last_memory_usage = 0; return Some(byte_count); } Err(err) => { @@ -99,6 +106,8 @@ impl Indexer { } } + INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); + self.last_memory_usage = 0; None } @@ -119,6 +128,8 @@ impl Indexer { } } } + INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64); + self.last_memory_usage = 0; } } @@ -201,6 +212,7 @@ impl<'a> IndexerBuilder<'a> { file_id: self.file_id, region_id: self.metadata.region_id, inner: Some(creator), + last_memory_usage: 0, } } } diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index ab8de9d1d9..0e6fdc6125 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -17,6 +17,7 @@ mod temp_provider; use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_telemetry::warn; @@ -45,9 +46,13 @@ use crate::sst::index::creator::statistics::Statistics; use crate::sst::index::creator::temp_provider::TempFileProvider; use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; use crate::sst::index::store::InstrumentedStore; -use crate::sst::index::{ - INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB, -}; +use crate::sst::index::INDEX_BLOB_TYPE; + +/// The minimum memory usage threshold for one column. +const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; type ByteCount = usize; type RowCount = usize; @@ -75,6 +80,9 @@ pub struct SstIndexCreator { /// Ignore column IDs for index creation. ignore_column_ids: HashSet, + + /// The memory usage of the index creator. + memory_usage: Arc, } impl SstIndexCreator { @@ -89,16 +97,19 @@ impl SstIndexCreator { memory_usage_threshold: Option, segment_row_count: NonZeroUsize, ) -> Self { - // `memory_usage_threshold` is the total memory usage threshold of the index creation, - // so we need to divide it by the number of columns - let memory_threshold = memory_usage_threshold.map(|threshold| { - (threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD) - }); let temp_file_provider = Arc::new(TempFileProvider::new( IntermediateLocation::new(&metadata.region_id, &sst_file_id), intermediate_manager, )); - let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold); + + let memory_usage = Arc::new(AtomicUsize::new(0)); + + let sorter = ExternalSorter::factory( + temp_file_provider.clone() as _, + Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN), + memory_usage.clone(), + memory_usage_threshold, + ); let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count)); let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); @@ -115,6 +126,7 @@ impl SstIndexCreator { aborted: false, ignore_column_ids: HashSet::default(), + memory_usage, } } @@ -294,6 +306,10 @@ impl SstIndexCreator { self.temp_file_provider.cleanup().await } + + pub fn memory_usage(&self) -> usize { + self.memory_usage.load(std::sync::atomic::Ordering::Relaxed) + } } #[cfg(test)] diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs index d938b236c8..ee80aaa0a6 100644 --- a/src/mito2/src/sst/index/creator/temp_provider.rs +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -110,7 +110,7 @@ impl TempFileProvider { pub async fn cleanup(&self) -> Result<()> { self.manager .store() - .remove_all(self.location.root_path()) + .remove_all(self.location.dir_to_cleanup()) .await } } @@ -172,7 +172,7 @@ mod tests { assert!(provider .manager .store() - .list(location.root_path()) + .list(location.dir_to_cleanup()) .await .unwrap() .is_empty()); diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index 66177ded73..cf48c9e6eb 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -68,7 +68,8 @@ impl IntermediateManager { /// during external sorting. #[derive(Debug, Clone)] pub struct IntermediateLocation { - root_path: String, + files_dir: String, + sst_dir: String, } impl IntermediateLocation { @@ -80,19 +81,20 @@ impl IntermediateLocation { let region_id = region_id.as_u64(); let uuid = Uuid::new_v4(); Self { - root_path: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"), + files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"), + sst_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/"), } } - /// Returns the root directory of the intermediate files - pub fn root_path(&self) -> &str { - &self.root_path + /// Returns the directory to clean up when the sorting is done + pub fn dir_to_cleanup(&self) -> &str { + &self.sst_dir } /// Returns the path of the directory for intermediate files associated with a column: /// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/` pub fn column_path(&self, column_id: &str) -> String { - util::join_path(&self.root_path, &format!("{column_id}/")) + util::join_path(&self.files_dir, &format!("{column_id}/")) } /// Returns the path of the intermediate file with the given id for a column: @@ -135,14 +137,19 @@ mod tests { let sst_file_id = FileId::random(); let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id); + assert_eq!( + location.dir_to_cleanup(), + format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/") + ); + let re = Regex::new(&format!( "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/", r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}" )) .unwrap(); - assert!(re.is_match(location.root_path())); + assert!(re.is_match(&location.files_dir)); - let uuid = location.root_path().split('/').nth(3).unwrap(); + let uuid = location.files_dir.split('/').nth(3).unwrap(); let column_id = "1"; assert_eq!(