From 2df8143ad5ee2b52d5b568fbf789416d2d9703aa Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 22 Feb 2023 16:56:20 +0800 Subject: [PATCH] feat: support table ttl (#1052) * feat: purge expired sst on compaction * chore: add more log * fix: clippy * fix: mark expired ssts as compacting before picking candidates * fix: some CR comments * fix: remove useless result * fix: cr comments --- src/cmd/src/standalone.rs | 7 +- src/common/time/src/error.rs | 13 +++ src/common/time/src/timestamp.rs | 119 +++++++++++++++++++---- src/storage/src/compaction/picker.rs | 43 +++++++- src/storage/src/compaction/scheduler.rs | 2 + src/storage/src/compaction/task.rs | 21 ++-- src/storage/src/engine.rs | 14 ++- src/storage/src/error.rs | 7 ++ src/storage/src/region.rs | 4 + src/storage/src/region/writer.rs | 21 +++- src/storage/src/sst.rs | 20 ++++ src/storage/src/test_util/config_util.rs | 1 + 12 files changed, 231 insertions(+), 41 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f3f1ea6ae5..060c7e1739 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; use common_telemetry::info; -use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig}; +use datanode::datanode::{ + CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig, +}; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; @@ -77,6 +79,7 @@ pub struct StandaloneOptions { pub mode: Mode, pub wal: WalConfig, pub storage: ObjectStoreConfig, + pub compaction: CompactionConfig, pub enable_memory_catalog: bool, } @@ -94,6 +97,7 @@ impl Default for StandaloneOptions { mode: Mode::Standalone, wal: WalConfig::default(), storage: ObjectStoreConfig::default(), + compaction: CompactionConfig::default(), enable_memory_catalog: false, } } @@ -120,6 +124,7 @@ impl StandaloneOptions { wal: self.wal, storage: self.storage, enable_memory_catalog: self.enable_memory_catalog, + compaction: self.compaction, ..Default::default() } } diff --git a/src/common/time/src/error.rs b/src/common/time/src/error.rs index b99af96803..c6603ef80f 100644 --- a/src/common/time/src/error.rs +++ b/src/common/time/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::num::TryFromIntError; use chrono::ParseError; use common_error::ext::ErrorExt; @@ -24,8 +25,18 @@ use snafu::{Backtrace, ErrorCompat, Snafu}; pub enum Error { #[snafu(display("Failed to parse string to date, raw: {}, source: {}", raw, source))] ParseDateStr { raw: String, source: ParseError }, + #[snafu(display("Failed to parse a string into Timestamp, raw string: {}", raw))] ParseTimestamp { raw: String, backtrace: Backtrace }, + + #[snafu(display("Current timestamp overflow, source: {}", source))] + TimestampOverflow { + source: TryFromIntError, + backtrace: Backtrace, + }, + + #[snafu(display("Timestamp arithmetic overflow, msg: {}", msg))] + ArithmeticOverflow { msg: String, backtrace: Backtrace }, } impl ErrorExt for Error { @@ -34,6 +45,8 @@ impl ErrorExt for Error { Error::ParseDateStr { .. } | Error::ParseTimestamp { .. } => { StatusCode::InvalidArguments } + Error::TimestampOverflow { .. } => StatusCode::Internal, + Error::ArithmeticOverflow { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index e34f35e6ef..6eaf1f0e32 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -17,12 +17,15 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::str::FromStr; +use std::time::Duration; use chrono::offset::Local; use chrono::{DateTime, LocalResult, NaiveDateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; -use crate::error::{Error, ParseTimestampSnafu}; +use crate::error; +use crate::error::{ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, TimestampOverflowSnafu}; #[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] pub struct Timestamp { @@ -31,6 +34,50 @@ pub struct Timestamp { } impl Timestamp { + /// Creates current timestamp in millisecond. + pub fn current_millis() -> Self { + Self { + value: crate::util::current_time_millis(), + unit: TimeUnit::Millisecond, + } + } + + /// Subtracts a duration from timestamp. + /// # Note + /// The result time unit remains unchanged even if `duration` has a different unit with `self`. + /// For example, a timestamp with value 1 and time unit second, subtracted by 1 millisecond + /// and the result is still 1 second. + pub fn sub(&self, duration: Duration) -> error::Result { + let duration: i64 = match self.unit { + TimeUnit::Second => { + i64::try_from(duration.as_secs()).context(TimestampOverflowSnafu)? + } + TimeUnit::Millisecond => { + i64::try_from(duration.as_millis()).context(TimestampOverflowSnafu)? + } + TimeUnit::Microsecond => { + i64::try_from(duration.as_micros()).context(TimestampOverflowSnafu)? + } + TimeUnit::Nanosecond => { + i64::try_from(duration.as_nanos()).context(TimestampOverflowSnafu)? + } + }; + + let value = self + .value + .checked_sub(duration) + .with_context(|| ArithmeticOverflowSnafu { + msg: format!( + "Try to subtract timestamp: {:?} with duration: {:?}", + self, duration + ), + })?; + Ok(Timestamp { + value, + unit: self.unit, + }) + } + pub fn new(value: i64, unit: TimeUnit) -> Self { Self { unit, value } } @@ -77,11 +124,11 @@ impl Timestamp { pub fn convert_to(&self, unit: TimeUnit) -> Option { if self.unit().factor() >= unit.factor() { let mul = self.unit().factor() / unit.factor(); - let value = self.value.checked_mul(mul)?; + let value = self.value.checked_mul(mul as i64)?; Some(Timestamp::new(value, unit)) } else { let mul = unit.factor() / self.unit().factor(); - Some(Timestamp::new(self.value.div_euclid(mul), unit)) + Some(Timestamp::new(self.value.div_euclid(mul as i64), unit)) } } @@ -92,23 +139,25 @@ impl Timestamp { pub fn convert_to_ceil(&self, unit: TimeUnit) -> Option { if self.unit().factor() >= unit.factor() { let mul = self.unit().factor() / unit.factor(); - let value = self.value.checked_mul(mul)?; + let value = self.value.checked_mul(mul as i64)?; Some(Timestamp::new(value, unit)) } else { let mul = unit.factor() / self.unit().factor(); - Some(Timestamp::new(self.value.div_ceil(mul), unit)) + Some(Timestamp::new(self.value.div_ceil(mul as i64), unit)) } } /// Split a [Timestamp] into seconds part and nanoseconds part. /// Notice the seconds part of split result is always rounded down to floor. - fn split(&self) -> (i64, i64) { - let sec_mul = TimeUnit::Second.factor() / self.unit.factor(); - let nsec_mul = self.unit.factor() / TimeUnit::Nanosecond.factor(); + fn split(&self) -> (i64, u32) { + let sec_mul = (TimeUnit::Second.factor() / self.unit.factor()) as i64; + let nsec_mul = (self.unit.factor() / TimeUnit::Nanosecond.factor()) as i64; let sec_div = self.value.div_euclid(sec_mul); let sec_mod = self.value.rem_euclid(sec_mul); - (sec_div, sec_mod * nsec_mul) + // safety: the max possible value of `sec_mod` is 999,999,999 + let nsec = u32::try_from(sec_mod * nsec_mul).unwrap(); + (sec_div, nsec) } /// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can @@ -122,15 +171,8 @@ impl Timestamp { } pub fn to_chrono_datetime(&self) -> LocalResult> { - let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor(); - let (mut secs, mut nsecs) = self.split(); - - if nsecs < 0 { - secs -= 1; - nsecs += nano_factor; - } - - Utc.timestamp_opt(secs, nsecs as u32) + let (sec, nsec) = self.split(); + Utc.timestamp_opt(sec, nsec) } } @@ -252,7 +294,7 @@ impl Display for TimeUnit { } impl TimeUnit { - pub fn factor(&self) -> i64 { + pub fn factor(&self) -> u32 { match self { TimeUnit::Second => 1_000_000_000, TimeUnit::Millisecond => 1_000_000, @@ -300,7 +342,7 @@ impl Hash for Timestamp { fn hash(&self, state: &mut H) { let (sec, nsec) = self.split(); state.write_i64(sec); - state.write_i64(nsec); + state.write_u32(nsec); } } @@ -789,4 +831,41 @@ mod tests { Timestamp::new(1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond) ); } + + #[test] + fn test_split_overflow() { + Timestamp::new(i64::MAX, TimeUnit::Second).split(); + Timestamp::new(i64::MIN, TimeUnit::Second).split(); + Timestamp::new(i64::MAX, TimeUnit::Millisecond).split(); + Timestamp::new(i64::MIN, TimeUnit::Millisecond).split(); + Timestamp::new(i64::MAX, TimeUnit::Microsecond).split(); + Timestamp::new(i64::MIN, TimeUnit::Microsecond).split(); + Timestamp::new(i64::MAX, TimeUnit::Nanosecond).split(); + Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split(); + let (sec, nsec) = Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split(); + let time = NaiveDateTime::from_timestamp_opt(sec, nsec).unwrap(); + assert_eq!(sec, time.timestamp()); + assert_eq!(nsec, time.timestamp_subsec_nanos()); + } + + #[test] + fn test_timestamp_sub() { + let res = Timestamp::new(1, TimeUnit::Second) + .sub(Duration::from_secs(1)) + .unwrap(); + assert_eq!(0, res.value); + assert_eq!(TimeUnit::Second, res.unit); + + let res = Timestamp::new(0, TimeUnit::Second) + .sub(Duration::from_secs(1)) + .unwrap(); + assert_eq!(-1, res.value); + assert_eq!(TimeUnit::Second, res.unit); + + let res = Timestamp::new(1, TimeUnit::Second) + .sub(Duration::from_millis(1)) + .unwrap(); + assert_eq!(1, res.value); + assert_eq!(TimeUnit::Second, res.unit); + } } diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 5e1a38814d..966e4ed5af 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -14,14 +14,20 @@ use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; -use common_telemetry::debug; +use common_telemetry::{debug, error, info}; +use common_time::Timestamp; +use snafu::ResultExt; use store_api::logstore::LogStore; use crate::compaction::scheduler::CompactionRequestImpl; use crate::compaction::strategy::{SimpleTimeWindowStrategy, StrategyRef}; use crate::compaction::task::{CompactionTask, CompactionTaskImpl}; +use crate::error::TtlCalculationSnafu; use crate::scheduler::Request; +use crate::sst::{FileHandle, Level}; +use crate::version::LevelMetasRef; /// Picker picks input SST files and builds the compaction task. /// Different compaction strategy may implement different pickers. @@ -57,6 +63,24 @@ impl SimplePicker { _phantom_data: Default::default(), } } + + fn get_expired_ssts( + &self, + levels: &LevelMetasRef, + ttl: Option, + ) -> crate::error::Result> { + let Some(ttl) = ttl else { return Ok(vec![]); }; + + let expire_time = Timestamp::current_millis() + .sub(ttl) + .context(TtlCalculationSnafu)?; + + let mut expired_ssts = vec![]; + for level in 0..levels.level_num() { + expired_ssts.extend(levels.level(level as Level).get_expired_files(&expire_time)); + } + Ok(expired_ssts) + } } impl Picker for SimplePicker { @@ -69,6 +93,22 @@ impl Picker for SimplePicker { req: &CompactionRequestImpl, ) -> crate::error::Result>> { let levels = &req.levels(); + let expired_ssts = self + .get_expired_ssts(levels, req.ttl) + .map_err(|e| { + error!(e;"Failed to get region expired SST files, region: {}, ttl: {:?}", req.region_id, req.ttl); + e + }) + .unwrap_or_default(); + + if !expired_ssts.is_empty() { + info!( + "Expired SSTs in region {}: {:?}", + req.region_id, expired_ssts + ); + // here we mark expired SSTs as compacting to avoid them being picked. + expired_ssts.iter().for_each(|f| f.mark_compacting(true)); + } for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); @@ -91,6 +131,7 @@ impl Picker for SimplePicker { shared_data: req.shared.clone(), wal: req.wal.clone(), manifest: req.manifest.clone(), + expired_ssts, })); } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 921e0db360..22bf002d18 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; @@ -48,6 +49,7 @@ pub struct CompactionRequestImpl { pub shared: SharedDataRef, pub manifest: RegionManifest, pub wal: Wal, + pub ttl: Option, } impl CompactionRequestImpl { diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 501c0a3314..3cee9ea049 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -42,6 +42,7 @@ pub struct CompactionTaskImpl { pub shared_data: SharedDataRef, pub wal: Wal, pub manifest: RegionManifest, + pub expired_ssts: Vec, } impl Debug for CompactionTaskImpl { @@ -60,19 +61,14 @@ impl Drop for CompactionTaskImpl { impl CompactionTaskImpl { /// Compacts inputs SSTs, returns `(output file, compacted input file)`. - async fn merge_ssts(&mut self) -> Result<(Vec, Vec)> { + async fn merge_ssts(&mut self) -> Result<(HashSet, HashSet)> { let mut futs = Vec::with_capacity(self.outputs.len()); let mut compacted_inputs = HashSet::new(); let region_id = self.shared_data.id(); for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); - compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta { - region_id, - file_name: f.file_name().to_string(), - time_range: *f.time_range(), - level: f.level(), - })); + compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { @@ -94,8 +90,8 @@ impl CompactionTaskImpl { /// Writes updated SST info into manifest. async fn write_manifest_and_apply( &self, - output: Vec, - input: Vec, + output: HashSet, + input: HashSet, ) -> Result<()> { let version = &self.shared_data.version_control; let region_version = version.metadata().version(); @@ -103,8 +99,8 @@ impl CompactionTaskImpl { let edit = RegionEdit { region_version, flushed_sequence: None, - files_to_add: output, - files_to_remove: input, + files_to_add: Vec::from_iter(output.into_iter()), + files_to_remove: Vec::from_iter(input.into_iter()), }; info!( "Compacted region: {}, region edit: {:?}", @@ -131,10 +127,11 @@ impl CompactionTask for CompactionTaskImpl { async fn run(mut self) -> Result<()> { self.mark_files_compacting(true); - let (output, compacted) = self.merge_ssts().await.map_err(|e| { + let (output, mut compacted) = self.merge_ssts().await.map_err(|e| { error!(e; "Failed to compact region: {}", self.shared_data.name()); e })?; + compacted.extend(self.expired_ssts.iter().map(FileHandle::meta)); self.write_manifest_and_apply(output, compacted) .await .map_err(|e| { diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 6edfbc8336..b11beed5b2 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use std::time::Duration; use async_trait::async_trait; use common_telemetry::logging::info; @@ -289,7 +290,8 @@ impl EngineInner { let mut guard = SlotGuard::new(name, &self.regions); - let store_config = self.region_store_config(&opts.parent_dir, opts.write_buffer_size, name); + let store_config = + self.region_store_config(&opts.parent_dir, opts.write_buffer_size, name, opts.ttl); let region = match RegionImpl::open(name.to_string(), store_config, opts).await? { None => return Ok(None), @@ -319,8 +321,12 @@ impl EngineInner { .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let store_config = - self.region_store_config(&opts.parent_dir, opts.write_buffer_size, ®ion_name); + let store_config = self.region_store_config( + &opts.parent_dir, + opts.write_buffer_size, + ®ion_name, + opts.ttl, + ); let region = RegionImpl::create(metadata, store_config).await?; @@ -341,6 +347,7 @@ impl EngineInner { parent_dir: &str, write_buffer_size: Option, region_name: &str, + ttl: Option, ) -> StoreConfig { let parent_dir = util::normalize_dir(parent_dir); @@ -363,6 +370,7 @@ impl EngineInner { compaction_scheduler: self.compaction_scheduler.clone(), engine_config: self.config.clone(), file_purger: self.file_purger.clone(), + ttl, } } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 339711a4c6..e08ce9a2d0 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -434,6 +434,12 @@ pub enum Error { source: object_store::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to calculate SST expire time, source: {}", source))] + TtlCalculation { + #[snafu(backtrace)] + source: common_time::error::Error, + }, } pub type Result = std::result::Result; @@ -508,6 +514,7 @@ impl ErrorExt for Error { StopScheduler { .. } => StatusCode::Internal, DeleteSst { .. } => StatusCode::StorageUnavailable, IllegalSchedulerState { .. } => StatusCode::Unexpected, + TtlCalculation { source, .. } => source.status_code(), } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 77d0531d04..d91fad2de0 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -19,6 +19,7 @@ mod writer; use std::collections::BTreeMap; use std::fmt; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use common_telemetry::logging; @@ -140,6 +141,7 @@ pub struct StoreConfig { pub compaction_scheduler: CompactionSchedulerRef, pub engine_config: Arc, pub file_purger: FilePurgerRef, + pub ttl: Option, } pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -198,6 +200,7 @@ impl RegionImpl { writer: Arc::new(RegionWriter::new( store_config.memtable_builder, store_config.engine_config.clone(), + store_config.ttl, )), wal, flush_strategy: store_config.flush_strategy, @@ -277,6 +280,7 @@ impl RegionImpl { let writer = Arc::new(RegionWriter::new( store_config.memtable_builder, store_config.engine_config.clone(), + store_config.ttl, )); let writer_ctx = WriterContext { shared: &shared, diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 9a4d2a574c..56af329c04 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use common_error::prelude::BoxedError; use common_telemetry::tracing::log::info; @@ -60,9 +61,13 @@ pub struct RegionWriter { } impl RegionWriter { - pub fn new(memtable_builder: MemtableBuilderRef, config: Arc) -> RegionWriter { + pub fn new( + memtable_builder: MemtableBuilderRef, + config: Arc, + ttl: Option, + ) -> RegionWriter { RegionWriter { - inner: Mutex::new(WriterInner::new(memtable_builder, config)), + inner: Mutex::new(WriterInner::new(memtable_builder, config, ttl)), version_mutex: Mutex::new(()), } } @@ -324,15 +329,21 @@ struct WriterInner { /// It should protected by upper mutex closed: bool, engine_config: Arc, + ttl: Option, } impl WriterInner { - fn new(memtable_builder: MemtableBuilderRef, engine_config: Arc) -> WriterInner { + fn new( + memtable_builder: MemtableBuilderRef, + engine_config: Arc, + ttl: Option, + ) -> WriterInner { WriterInner { memtable_builder, flush_handle: None, engine_config, closed: false, + ttl, } } @@ -596,7 +607,7 @@ impl WriterInner { return Ok(()); } - let cb = Self::build_flush_callback(¤t_version, ctx, &self.engine_config); + let cb = Self::build_flush_callback(¤t_version, ctx, &self.engine_config, self.ttl); let flush_req = FlushJob { max_memtable_id: max_memtable_id.unwrap(), @@ -624,6 +635,7 @@ impl WriterInner { version: &VersionRef, ctx: &WriterContext, config: &Arc, + ttl: Option, ) -> Option { let region_id = version.metadata().id(); let compaction_request = CompactionRequestImpl { @@ -633,6 +645,7 @@ impl WriterInner { shared: ctx.shared.clone(), manifest: ctx.manifest.clone(), wal: ctx.wal.clone(), + ttl, }; let compaction_scheduler = ctx.compaction_scheduler.clone(); let shared_data = ctx.shared.clone(); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 25edf74e93..f7ebdc1af2 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -145,6 +145,21 @@ impl LevelMeta { self.files.len() } + /// Returns expired SSTs from current level. + pub fn get_expired_files(&self, expire_time: &Timestamp) -> Vec { + self.files + .iter() + .filter_map(|(_, v)| { + let Some((_, end)) = v.time_range() else { return None; }; + if end < expire_time { + Some(v.clone()) + } else { + None + } + }) + .collect() + } + pub fn files(&self) -> impl Iterator { self.files.values() } @@ -212,6 +227,11 @@ impl FileHandle { pub fn mark_deleted(&self) { self.inner.deleted.store(true, Ordering::Relaxed); } + + #[inline] + pub fn meta(&self) -> FileMeta { + self.inner.meta.clone() + } } /// Actually data of [FileHandle]. diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 261b7d3606..31459cecee 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -69,5 +69,6 @@ pub async fn new_store_config( compaction_scheduler, engine_config: Default::default(), file_purger, + ttl: None, } }