From 55b7656956711106fbe62ad8bc9f649ab839481e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 31 Dec 2024 11:28:02 +0800 Subject: [PATCH] feat: override `__sequence` on creating SST to save space and CPU (#5252) * override memtable sequence Signed-off-by: Ruihang Xia * override sst sequence Signed-off-by: Ruihang Xia * chore changes per to CR comments Signed-off-by: Ruihang Xia * use correct sequence number Signed-off-by: Ruihang Xia * wrap a method to get max sequence Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/access_layer.rs | 6 ++- src/mito2/src/cache/write_cache.rs | 6 ++- src/mito2/src/compaction/compactor.rs | 9 +++++ src/mito2/src/compaction/test_util.rs | 2 + src/mito2/src/compaction/twcs.rs | 1 + src/mito2/src/engine/basic_test.rs | 2 +- src/mito2/src/flush.rs | 4 ++ src/mito2/src/manifest/tests/checkpoint.rs | 1 + src/mito2/src/memtable.rs | 9 ++++- src/mito2/src/memtable/key_values.rs | 19 ++++++++++ src/mito2/src/memtable/partition_tree.rs | 22 ++++++++++- src/mito2/src/memtable/time_series.rs | 21 ++++++++++- src/mito2/src/sst/file.rs | 7 ++++ src/mito2/src/sst/file_purger.rs | 4 ++ src/mito2/src/sst/parquet.rs | 12 +++--- src/mito2/src/sst/parquet/format.rs | 43 +++++++++++++++++++++- src/mito2/src/sst/parquet/writer.rs | 5 ++- src/mito2/src/test_util/sst_util.rs | 2 + src/mito2/src/test_util/version_util.rs | 3 ++ 19 files changed, 163 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 0d6204d024..16d1480a61 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -19,6 +19,7 @@ use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use store_api::storage::SequenceNumber; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; @@ -164,7 +165,9 @@ impl AccessLayer { request.metadata, indexer, ); - writer.write_all(request.source, write_opts).await? + writer + .write_all(request.source, request.max_sequence, write_opts) + .await? }; // Put parquet metadata to cache manager. @@ -194,6 +197,7 @@ pub(crate) struct SstWriteRequest { pub(crate) cache_manager: CacheManagerRef, #[allow(dead_code)] pub(crate) storage: Option, + pub(crate) max_sequence: Option, /// Configs for index pub(crate) index_options: IndexOptions, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 681355b373..1e9dfb5400 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -138,7 +138,9 @@ impl WriteCache { indexer, ); - let sst_info = writer.write_all(write_request.source, write_opts).await?; + let sst_info = writer + .write_all(write_request.source, write_request.max_sequence, write_opts) + .await?; timer.stop_and_record(); @@ -375,6 +377,7 @@ mod tests { metadata, source, storage: None, + max_sequence: None, cache_manager: Default::default(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), @@ -468,6 +471,7 @@ mod tests { metadata, source, storage: None, + max_sequence: None, cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index c07a333eda..ceeb509bc1 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZero; use std::sync::Arc; use std::time::Duration; @@ -303,6 +304,12 @@ impl Compactor for DefaultCompactor { let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone(); + let max_sequence = output + .inputs + .iter() + .map(|f| f.meta_ref().sequence) + .max() + .flatten(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -325,6 +332,7 @@ impl Compactor for DefaultCompactor { source: Source::Reader(reader), cache_manager, storage, + max_sequence: max_sequence.map(NonZero::get), index_options, inverted_index_config, fulltext_index_config, @@ -343,6 +351,7 @@ impl Compactor for DefaultCompactor { index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, + sequence: max_sequence, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 1df462004f..4baa6a9db5 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -39,6 +39,7 @@ pub fn new_file_handle( index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, }, file_purger, ) @@ -63,6 +64,7 @@ pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec 0); // Chief says this assert can ensure the size is counted. assert_eq!(region_stat.num_rows, 10); // region total usage diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dd844a7d53..a0400deb5b 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -15,6 +15,7 @@ //! Flush related utilities and structs. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -345,6 +346,7 @@ impl RegionFlushTask { continue; } + let max_sequence = mem.stats().max_sequence(); let file_id = FileId::random(); let iter = mem.iter(None, None)?; let source = Source::Iter(iter); @@ -357,6 +359,7 @@ impl RegionFlushTask { source, cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), + max_sequence: Some(max_sequence), index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), @@ -382,6 +385,7 @@ impl RegionFlushTask { index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, + sequence: NonZeroU64::new(max_sequence), }; file_metas.push(file_meta); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 6f2c92bc5e..7e20bc2ced 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -225,6 +225,7 @@ async fn checkpoint_with_different_compression_types() { index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 942d77a209..7d00b6bde8 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -23,7 +23,7 @@ pub use bulk::part::BulkPart; use common_time::Timestamp; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; use crate::config::MitoConfig; @@ -77,6 +77,8 @@ pub struct MemtableStats { num_rows: usize, /// Total number of ranges in the memtable. num_ranges: usize, + /// The maximum sequence number in the memtable. + max_sequence: SequenceNumber, } impl MemtableStats { @@ -106,6 +108,11 @@ impl MemtableStats { pub fn num_ranges(&self) -> usize { self.num_ranges } + + /// Returns the maximum sequence number in the memtable. + pub fn max_sequence(&self) -> SequenceNumber { + self.max_sequence + } } pub type BoxedBatchIterator = Box> + Send>; diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index a98826bdb1..73013920e6 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -63,6 +63,25 @@ impl KeyValues { // Safety: rows is not None. self.mutation.rows.as_ref().unwrap().rows.len() } + + /// Returns if this container is empty + pub fn is_empty(&self) -> bool { + self.mutation.rows.is_none() + } + + /// Return the max sequence in this container. + /// + /// When the mutation has no rows, the sequence is the same as the mutation sequence. + pub fn max_sequence(&self) -> SequenceNumber { + let mut sequence = self.mutation.sequence; + let num_rows = self.mutation.rows.as_ref().unwrap().rows.len() as u64; + sequence += num_rows; + if num_rows > 0 { + sequence -= 1; + } + + sequence + } } /// Key value view of a mutation. diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 1376f92331..d386232083 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -24,7 +24,7 @@ mod shard_builder; mod tree; use std::fmt; -use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; @@ -113,6 +113,7 @@ pub struct PartitionTreeMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + max_sequence: AtomicU64, /// Total written rows in memtable. This also includes deleted and duplicated rows. num_rows: AtomicUsize, } @@ -131,6 +132,10 @@ impl Memtable for PartitionTreeMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { + if kvs.is_empty() { + return Ok(()); + } + // TODO(yingwen): Validate schema while inserting rows. let mut metrics = WriteMetrics::default(); @@ -140,6 +145,12 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + // update max_sequence + if res.is_ok() { + let sequence = kvs.max_sequence(); + self.max_sequence.fetch_max(sequence, Ordering::Relaxed); + } + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); res } @@ -152,6 +163,12 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + // update max_sequence + if res.is_ok() { + self.max_sequence + .fetch_max(key_value.sequence(), Ordering::Relaxed); + } + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -210,6 +227,7 @@ impl Memtable for PartitionTreeMemtable { time_range: None, num_rows: 0, num_ranges: 0, + max_sequence: 0, }; } @@ -229,6 +247,7 @@ impl Memtable for PartitionTreeMemtable { time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), num_ranges: 1, + max_sequence: self.max_sequence.load(Ordering::Relaxed), } } @@ -267,6 +286,7 @@ impl PartitionTreeMemtable { max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), num_rows: AtomicUsize::new(0), + max_sequence: AtomicU64::new(0), } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 8ef6f44121..23452d783a 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,7 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -100,6 +100,7 @@ pub struct TimeSeriesMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + max_sequence: AtomicU64, dedup: bool, merge_mode: MergeMode, /// Total written rows in memtable. This also includes deleted and duplicated rows. @@ -134,6 +135,7 @@ impl TimeSeriesMemtable { alloc_tracker: AllocTracker::new(write_buffer_manager), max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), + max_sequence: AtomicU64::new(0), dedup, merge_mode, num_rows: Default::default(), @@ -186,6 +188,10 @@ impl Memtable for TimeSeriesMemtable { } fn write(&self, kvs: &KeyValues) -> Result<()> { + if kvs.is_empty() { + return Ok(()); + } + let mut local_stats = WriteMetrics::default(); for kv in kvs.iter() { @@ -199,6 +205,10 @@ impl Memtable for TimeSeriesMemtable { // so that we can ensure writing to memtable will succeed. self.update_stats(local_stats); + // update max_sequence + let sequence = kvs.max_sequence(); + self.max_sequence.fetch_max(sequence, Ordering::Relaxed); + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); Ok(()) } @@ -209,6 +219,13 @@ impl Memtable for TimeSeriesMemtable { metrics.value_bytes += std::mem::size_of::() + std::mem::size_of::(); self.update_stats(metrics); + + // update max_sequence + if res.is_ok() { + self.max_sequence + .fetch_max(key_value.sequence(), Ordering::Relaxed); + } + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -294,6 +311,7 @@ impl Memtable for TimeSeriesMemtable { time_range: None, num_rows: 0, num_ranges: 0, + max_sequence: 0, }; } let ts_type = self @@ -311,6 +329,7 @@ impl Memtable for TimeSeriesMemtable { time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), num_ranges: 1, + max_sequence: self.max_sequence.load(Ordering::Relaxed), } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 33166d99cd..e9959ae562 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -15,6 +15,7 @@ //! Structures to describe metadata of files. use std::fmt; +use std::num::NonZeroU64; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -134,6 +135,11 @@ pub struct FileMeta { /// the default value `0` doesn't means the file doesn't contains any rows, /// but instead means the number of rows is unknown. pub num_row_groups: u64, + /// Sequence in this file. + /// + /// This sequence is the only sequence in this file. And it's retrieved from the max + /// sequence of the rows on generating this file. + pub sequence: Option, } /// Type of index. @@ -343,6 +349,7 @@ mod tests { index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 81251c91a5..7d81445c67 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -119,6 +119,8 @@ impl FilePurger for LocalFilePurger { #[cfg(test)] mod tests { + use std::num::NonZeroU64; + use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; use object_store::ObjectStore; @@ -176,6 +178,7 @@ mod tests { index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, }, file_purger, ); @@ -238,6 +241,7 @@ mod tests { index_file_size: 4096, num_rows: 1024, num_row_groups: 1, + sequence: NonZeroU64::new(4096), }, file_purger, ); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index af677c28d1..05dafb0edf 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -134,7 +134,7 @@ mod tests { ); let info = writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -189,7 +189,7 @@ mod tests { ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -258,7 +258,7 @@ mod tests { ); let sst_info = writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .expect("write_all should return sst info"); @@ -297,7 +297,7 @@ mod tests { Indexer::default(), ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -350,7 +350,7 @@ mod tests { Indexer::default(), ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); @@ -386,7 +386,7 @@ mod tests { ); writer - .write_all(source, &write_opts) + .write_all(source, None, &write_opts) .await .unwrap() .unwrap(); diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index f2d6c7614b..1256210445 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -42,7 +42,7 @@ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, @@ -65,6 +65,7 @@ pub(crate) struct WriteFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, + override_sequence: Option, } impl WriteFormat { @@ -74,9 +75,19 @@ impl WriteFormat { WriteFormat { metadata, arrow_schema, + override_sequence: None, } } + /// Set override sequence. + pub(crate) fn with_override_sequence( + mut self, + override_sequence: Option, + ) -> Self { + self.override_sequence = override_sequence; + self + } + /// Gets the arrow schema to store in parquet. pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema @@ -107,7 +118,14 @@ impl WriteFormat { columns.push(batch.timestamps().to_arrow_array()); // Add internal columns: primary key, sequences, op types. columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows())); - columns.push(batch.sequences().to_arrow_array()); + + if let Some(override_sequence) = self.override_sequence { + let sequence_array = + Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); + columns.push(sequence_array); + } else { + columns.push(batch.sequences().to_arrow_array()); + } columns.push(batch.op_types().to_arrow_array()); RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) @@ -756,6 +774,27 @@ mod tests { assert_eq!(expect_record, actual); } + #[test] + fn test_convert_batch_with_override_sequence() { + let metadata = build_test_region_metadata(); + let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411)); + + let num_rows = 4; + let batch = new_batch(b"test", 1, 2, num_rows); + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key + Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap(); + + let actual = write_format.convert_batch(&batch).unwrap(); + assert_eq!(expect_record, actual); + } + #[test] fn test_projection_indices() { let metadata = build_test_region_metadata(); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 1d63f5e3d0..13f7cfb3ec 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -31,6 +31,7 @@ use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; +use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; @@ -112,9 +113,11 @@ where pub async fn write_all( &mut self, mut source: Source, + override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, ) -> Result> { - let write_format = WriteFormat::new(self.metadata.clone()); + let write_format = + WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(res) = self diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 674c33969d..63c3fc09d6 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,6 +14,7 @@ //! Utilities for testing SSTs. +use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; @@ -116,6 +117,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index d4a17ffe47..68534d34ee 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,6 +15,7 @@ //! Utilities to mock version. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; @@ -103,6 +104,7 @@ impl VersionControlBuilder { index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, }, ); self @@ -194,6 +196,7 @@ pub(crate) fn apply_edit( index_file_size: 0, num_rows: 0, num_row_groups: 0, + sequence: None, } }) .collect();