diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs index c06e10a13a..c5f55ffbf4 100644 --- a/src/common/time/src/timestamp_millis.rs +++ b/src/common/time/src/timestamp_millis.rs @@ -14,6 +14,8 @@ use std::cmp::Ordering; +use crate::Timestamp; + /// Unix timestamp in millisecond resolution. /// /// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'. @@ -77,22 +79,29 @@ impl PartialOrd for i64 { } } -pub trait BucketAligned { - /// Returns the timestamp aligned by `bucket_duration` in milliseconds or - /// `None` if overflow occurred. +pub trait BucketAligned: Sized { + /// Returns the timestamp aligned by `bucket_duration` or `None` if underflow occurred. /// /// # Panics /// Panics if `bucket_duration <= 0`. - fn align_by_bucket(self, bucket_duration: i64) -> Option; + fn align_by_bucket(self, bucket_duration: i64) -> Option; } -impl> BucketAligned for T { - fn align_by_bucket(self, bucket_duration: i64) -> Option { +impl BucketAligned for i64 { + fn align_by_bucket(self, bucket_duration: i64) -> Option { assert!(bucket_duration > 0, "{}", bucket_duration); - self.into() - .checked_div_euclid(bucket_duration) + self.checked_div_euclid(bucket_duration) .and_then(|val| val.checked_mul(bucket_duration)) - .map(TimestampMillis) + } +} + +impl BucketAligned for Timestamp { + fn align_by_bucket(self, bucket_duration: i64) -> Option { + assert!(bucket_duration > 0, "{}", bucket_duration); + let unit = self.unit(); + self.value() + .align_by_bucket(bucket_duration) + .map(|val| Timestamp::new(val, unit)) } } @@ -121,24 +130,54 @@ mod tests { #[test] fn test_align_by_bucket() { let bucket = 100; - assert_eq!(0, TimestampMillis::new(0).align_by_bucket(bucket).unwrap()); - assert_eq!(0, TimestampMillis::new(1).align_by_bucket(bucket).unwrap()); - assert_eq!(0, TimestampMillis::new(99).align_by_bucket(bucket).unwrap()); assert_eq!( - 100, - TimestampMillis::new(100).align_by_bucket(bucket).unwrap() + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(0) + .align_by_bucket(bucket) + .unwrap() ); assert_eq!( - 100, - TimestampMillis::new(199).align_by_bucket(bucket).unwrap() + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1) + .align_by_bucket(bucket) + .unwrap() + ); + assert_eq!( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(99) + .align_by_bucket(bucket) + .unwrap() + ); + assert_eq!( + Timestamp::new_millisecond(100), + Timestamp::new_millisecond(100) + .align_by_bucket(bucket) + .unwrap() + ); + assert_eq!( + Timestamp::new_millisecond(100), + Timestamp::new_millisecond(199) + .align_by_bucket(bucket) + .unwrap() ); - assert_eq!(0, TimestampMillis::MAX.align_by_bucket(i64::MAX).unwrap()); assert_eq!( - i64::MAX, - TimestampMillis::INF.align_by_bucket(i64::MAX).unwrap() + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(i64::MAX - 1) + .align_by_bucket(i64::MAX) + .unwrap() ); - assert_eq!(None, TimestampMillis::MIN.align_by_bucket(bucket)); + assert_eq!( + Timestamp::new_millisecond(i64::MAX), + Timestamp::new_millisecond(i64::MAX) + .align_by_bucket(i64::MAX) + .unwrap() + ); + + assert_eq!( + None, + Timestamp::new_millisecond(i64::MIN).align_by_bucket(bucket) + ); } } diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 62d5e8250e..e88c317ed4 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -184,8 +184,8 @@ impl ChunkReaderBuilder { return true; } // end_timestamp of sst file is inclusive. - let file_ts_range = - TimestampRange::new_inclusive(file.start_timestamp(), file.end_timestamp()); + let Some((start, end)) = *file.time_range() else { return true; }; + let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end)); file_ts_range.intersects(&predicate) } } diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index 93a944affe..b4c2490b85 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -16,4 +16,5 @@ mod dedup_deque; mod picker; mod rate_limit; mod scheduler; +mod strategy; mod task; diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 7b84c593c5..871de4884f 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -12,28 +12,54 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_telemetry::debug; + use crate::compaction::scheduler::CompactionRequestImpl; +use crate::compaction::strategy::StrategyRef; use crate::compaction::task::{CompactionTask, CompactionTaskImpl}; -/// Picker picks input SST files and build the compaction task. +/// Picker picks input SST files and builds the compaction task. /// Different compaction strategy may implement different pickers. pub trait Picker: Send + 'static { - fn pick(&self, req: &R) -> crate::error::Result; + fn pick(&self, ctx: &PickerContext, req: &R) -> crate::error::Result>; } +pub struct PickerContext {} + /// L0 -> L1 all-to-all compaction based on time windows. -pub(crate) struct SimplePicker {} +pub(crate) struct SimplePicker { + strategy: StrategyRef, +} #[allow(unused)] impl SimplePicker { - pub fn new() -> Self { - Self {} + pub fn new(strategy: StrategyRef) -> Self { + Self { strategy } } } impl Picker for SimplePicker { - fn pick(&self, _req: &CompactionRequestImpl) -> crate::error::Result { - todo!() + fn pick( + &self, + ctx: &PickerContext, + req: &CompactionRequestImpl, + ) -> crate::error::Result> { + let levels = req.levels(); + + for level_num in 0..levels.level_num() { + let level = levels.level(level_num as u8); + let outputs = self.strategy.pick(ctx, level); + + if outputs.is_empty() { + debug!("No SST file can be compacted at level {}", level_num); + return Ok(None); + } + + debug!("Found SST files to compact {:?}", outputs); + // TODO(hl): build compaction task + } + + Ok(None) } } @@ -60,8 +86,12 @@ pub mod tests { } impl Picker for MockPicker { - fn pick(&self, _req: &R) -> crate::error::Result { - Ok(NoopCompactionTask::new(self.cbs.clone())) + fn pick( + &self, + _ctx: &PickerContext, + _req: &R, + ) -> crate::error::Result> { + Ok(Some(NoopCompactionTask::new(self.cbs.clone()))) } } } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index c22d977894..458655fc74 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -24,17 +24,25 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::compaction::dedup_deque::DedupDeque; -use crate::compaction::picker::Picker; +use crate::compaction::picker::{Picker, PickerContext}; use crate::compaction::rate_limit::{ BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimitToken, RateLimiter, }; use crate::compaction::task::CompactionTask; use crate::error::{Result, StopCompactionSchedulerSnafu}; +use crate::version::LevelMetasRef; /// Table compaction request. #[derive(Default)] pub struct CompactionRequestImpl { table_id: TableId, + levels: LevelMetasRef, +} + +impl CompactionRequestImpl { + pub fn levels(&self) -> &LevelMetasRef { + &self.levels + } } impl CompactionRequest for CompactionRequestImpl { @@ -162,7 +170,7 @@ struct CompactionHandler> { } #[allow(unused)] -impl> CompactionHandler { +impl> CompactionHandler { /// Runs table compaction requests dispatch loop. pub async fn run(&self) { let task_notifier = self.task_notifier.clone(); @@ -214,7 +222,11 @@ impl> CompactionHandler { token: BoxedRateLimitToken, ) -> Result<()> { let cloned_notify = self.task_notifier.clone(); - let task = self.build_compaction_task(req).await?; + let table_id = req.table_id(); + let Some(task) = self.build_compaction_task(req).await? else { + info!("No file needs compaction in table: {}", table_id); + return Ok(()); + }; // TODO(hl): we need to keep a track of task handle here to allow task cancellation. common_runtime::spawn_bg(async move { @@ -230,8 +242,9 @@ impl> CompactionHandler { } // TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction) - async fn build_compaction_task(&self, req: R) -> crate::error::Result { - self.picker.pick(&req) + async fn build_compaction_task(&self, req: R) -> crate::error::Result> { + let ctx = PickerContext {}; + self.picker.pick(&ctx, &req) } } diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs new file mode 100644 index 0000000000..34e464ffe3 --- /dev/null +++ b/src/storage/src/compaction/strategy.rs @@ -0,0 +1,334 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use common_telemetry::{debug, warn}; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use common_time::Timestamp; + +use crate::compaction::picker::PickerContext; +use crate::compaction::task::CompactionOutput; +use crate::sst::{FileHandle, LevelMeta}; + +/// Compaction strategy that defines which SSTs need to be compacted at given level. +pub trait Strategy { + fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> Vec; +} + +pub type StrategyRef = Arc; + +/// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction in a time-window tiered +/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned +/// by a inferred time bucket in level 1. +pub struct SimpleTimeWindowStrategy {} + +impl Strategy for SimpleTimeWindowStrategy { + fn pick(&self, _ctx: &PickerContext, level: &LevelMeta) -> Vec { + // SimpleTimeWindowStrategy only handles level 0 to level 1 compaction. + if level.level() != 0 { + return vec![]; + } + let files = find_compactable_files(level); + if files.is_empty() { + return vec![]; + } + + let time_bucket = infer_time_bucket(&files); + let buckets = calculate_time_buckets(time_bucket, &files); + debug!("File buckets: {:?}", buckets); + buckets + .into_iter() + .map(|(bound, files)| CompactionOutput { + output_level: 1, + bucket_bound: bound, + bucket: time_bucket, + inputs: files, + }) + .collect() + } +} + +/// Finds files that can be compacted in given level. +/// Currently they're files that is not currently under compaction. +#[inline] +fn find_compactable_files(level: &LevelMeta) -> Vec { + level + .files() + .iter() + .filter(|f| !f.compacting()) + .cloned() + .collect() +} + +/// Calculates buckets for files. If file does not contain a time range in metadata, it will be +/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) +/// so that all files without timestamp can be compacted together. +fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap> { + let mut buckets = HashMap::new(); + + for file in files { + if let Some((start, end)) = file.time_range() { + let bounds = file_time_bucket_span( + start.convert_to(TimeUnit::Second).unwrap().value(), + end.convert_to(TimeUnit::Second).unwrap().value(), + bucket_sec, + ); + for bound in bounds { + buckets + .entry(bound) + .or_insert_with(Vec::new) + .push(file.clone()); + } + } else { + // Files without timestamp range is assign to a special bucket `i64::MAX`, + // so that they can be compacted together. + buckets + .entry(i64::MAX) + .or_insert_with(Vec::new) + .push(file.clone()); + } + } + buckets +} + +/// Calculates timestamp span between start and end timestamp. +fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec { + assert!(start_sec <= end_sec); + + // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot + // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. + let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + + let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); + while start_aligned < end_aligned { + res.push(start_aligned); + start_aligned += bucket_sec; + } + res.push(end_aligned); + res +} + +/// Infers the suitable time bucket duration. +/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span +/// into time bucket. +fn infer_time_bucket(files: &[FileHandle]) -> i64 { + let mut max_ts = &Timestamp::new(i64::MIN, TimeUnit::Second); + let mut min_ts = &Timestamp::new(i64::MAX, TimeUnit::Second); + + for f in files { + if let Some((start, end)) = f.time_range() { + min_ts = min_ts.min(start); + max_ts = max_ts.max(end); + } else { + // we don't expect an SST file without time range, + // it's either a bug or data corruption. + warn!("Found SST file without time range metadata: {f:?}"); + } + } + + // safety: Convert whatever timestamp into seconds will not cause overflow. + let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); + let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); + + max_sec + .checked_sub(min_sec) + .map(fit_time_bucket) // return the max bucket on subtraction overflow. + .unwrap_or_else(|| *TIME_BUCKETS.last().unwrap()) // safety: TIME_BUCKETS cannot be empty. +} + +/// A set of predefined time buckets. +const TIME_BUCKETS: [i64; 5] = [ + 60 * 60, // one hour + 2 * 60 * 60, // two hours + 12 * 60 * 60, // twelve hours + 24 * 60 * 60, // one day + 7 * 24 * 60 * 60, // one week +]; + +/// Fits a given time span into time bucket by find the minimum bucket that can cover the span. +/// Returns the max bucket if no such bucket can be found. +fn fit_time_bucket(span_sec: i64) -> i64 { + assert!(span_sec >= 0); + for b in TIME_BUCKETS { + if b >= span_sec { + return b; + } + } + *TIME_BUCKETS.last().unwrap() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + use crate::sst::FileMeta; + + #[test] + fn test_time_bucket_span() { + assert_eq!(vec![0], file_time_bucket_span(1, 9, 10)); + + assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10)); + + assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10)); + + assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10)); + } + + #[test] + fn test_time_bucket_span_large() { + assert_eq!( + vec![ + (i64::MAX - 10).align_by_bucket(10).unwrap(), + i64::MAX.align_by_bucket(10).unwrap(), + ], + file_time_bucket_span(i64::MAX - 10, i64::MAX, 10) + ); + + // magic hmmm? + for bucket in 1..100 { + assert_eq!( + vec![ + i64::MIN, + (i64::MIN + bucket).align_by_bucket(bucket).unwrap() + ], + file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket) + ); + } + } + + #[test] + fn test_time_bucket() { + assert_eq!(TIME_BUCKETS[0], fit_time_bucket(1)); + assert_eq!(TIME_BUCKETS[0], fit_time_bucket(60 * 60)); + assert_eq!(TIME_BUCKETS[1], fit_time_bucket(60 * 60 + 1)); + + assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2] - 1)); + assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2])); + assert_eq!(TIME_BUCKETS[3], fit_time_bucket(TIME_BUCKETS[3] - 1)); + assert_eq!(TIME_BUCKETS[4], fit_time_bucket(i64::MAX)); + } + + #[test] + fn test_infer_time_buckets() { + assert_eq!( + TIME_BUCKETS[0], + infer_time_bucket(&[ + new_file_handle("a", 0, TIME_BUCKETS[0] * 1000 - 1), + new_file_handle("b", 1, 10_000) + ]) + ); + } + + fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle { + FileHandle::new(FileMeta { + file_name: name.to_string(), + time_range: Some(( + Timestamp::new_millisecond(start_ts_millis), + Timestamp::new_millisecond(end_ts_millis), + )), + level: 0, + }) + } + + fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec { + input + .iter() + .map(|(name, start, end)| new_file_handle(name, *start, *end)) + .collect() + } + + fn check_bucket_calculation( + bucket_sec: i64, + files: Vec, + expected: &[(i64, &[&str])], + ) { + let res = calculate_time_buckets(bucket_sec, &files); + + let expected = expected + .iter() + .map(|(bucket, file_names)| { + ( + *bucket, + file_names + .iter() + .map(|s| s.to_string()) + .collect::>(), + ) + }) + .collect::>(); + + for (bucket, file_names) in expected { + let actual = res + .get(&bucket) + .unwrap() + .iter() + .map(|f| f.file_name().to_string()) + .collect(); + assert_eq!( + file_names, actual, + "bucket: {bucket}, expected: {file_names:?}, actual: {actual:?}", + ); + } + } + + #[test] + fn test_calculate_time_buckets() { + // simple case, files with disjoint + check_bucket_calculation( + 10, + new_file_handles(&[("a", 0, 9000), ("b", 10000, 19000)]), + &[(0, &["a"]), (10, &["b"])], + ); + + // files across buckets + check_bucket_calculation( + 10, + new_file_handles(&[("a", 0, 10001), ("b", 10000, 19000)]), + &[(0, &["a"]), (10, &["a", "b"])], + ); + check_bucket_calculation( + 10, + new_file_handles(&[("a", 0, 10000)]), + &[(0, &["a"]), (10, &["a"])], + ); + + // files without timestamp are align to a special bucket: i64::MAX + check_bucket_calculation( + 10, + vec![FileHandle::new(FileMeta { + file_name: "a".to_string(), + time_range: None, + level: 0, + })], + &[(i64::MAX, &["a"])], + ); + + // file with an large time range + + let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0])) + .into_iter() + .map(|b| (b * TIME_BUCKETS[0], &["a"] as _)) + .collect::>(); + check_bucket_calculation( + TIME_BUCKETS[0], + new_file_handles(&[("a", 0, TIME_BUCKETS[4] * 1000)]), + &expected, + ); + } +} diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 7428b6f23b..e80d238f30 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::error::Result; -use crate::sst::FileHandle; +use crate::sst::{FileHandle, Level}; #[async_trait::async_trait] pub trait CompactionTask: Send + Sync + 'static { @@ -40,6 +40,21 @@ pub(crate) struct CompactionInput { file: FileHandle, } +/// Many-to-many compaction can be decomposed to a many-to-one compaction from level n to level n+1 +/// and a many-to-one compaction from level n+1 to level n+1. +#[derive(Debug)] +#[allow(unused)] +pub struct CompactionOutput { + /// Compaction output file level. + pub(crate) output_level: Level, + /// The left bound of time bucket. + pub(crate) bucket_bound: i64, + /// Bucket duration in seconds. + pub(crate) bucket: i64, + /// Compaction input files. + pub(crate) inputs: Vec, +} + #[cfg(test)] pub mod tests { use std::sync::Arc; diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index f952591eb6..abb5f0cd5d 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -185,18 +185,14 @@ impl FlushJob { // TODO(hl): Check if random file name already exists in meta. let iter = m.iter(&iter_ctx)?; futures.push(async move { - let SstInfo { - start_timestamp, - end_timestamp, - } = self + let SstInfo { time_range } = self .sst_layer .write_sst(&file_name, iter, &WriteOptions::default()) .await?; Ok(FileMeta { file_name, - start_timestamp, - end_timestamp, + time_range, level: 0, }) }); diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index e840051ec2..20da0051b8 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -30,6 +30,7 @@ pub fn build_region_meta() -> RegionMetadata { desc.try_into().unwrap() } +// TODO(hl): region edit should contain the time range of added files pub fn build_region_edit( sequence: SequenceNumber, files_to_add: &[&str], @@ -42,8 +43,7 @@ pub fn build_region_edit( .iter() .map(|f| FileMeta { file_name: f.to_string(), - start_timestamp: None, - end_timestamp: None, + time_range: None, level: 0, }) .collect(), @@ -51,8 +51,7 @@ pub fn build_region_edit( .iter() .map(|f| FileMeta { file_name: f.to_string(), - start_timestamp: None, - end_timestamp: None, + time_range: None, level: 0, }) .collect(), diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 4d60853cb3..7aca8f7c22 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -14,6 +14,7 @@ mod parquet; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -29,11 +30,13 @@ use crate::schema::ProjectedSchemaRef; use crate::sst::parquet::{ParquetReader, ParquetWriter}; /// Maximum level of SSTs. -pub const MAX_LEVEL: usize = 1; +pub const MAX_LEVEL: u8 = 2; -// We only has fixed number of level, so we array to hold elements. This implement +pub type Level = u8; + +// We only has fixed number of level, so we use array to hold elements. This implementation // detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. -type LevelMetaVec = [LevelMeta; MAX_LEVEL]; +type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; /// Visitor to access file in each level. pub trait Visitor { @@ -59,6 +62,17 @@ impl LevelMetas { } } + /// Returns total level number. + #[inline] + pub fn level_num(&self) -> usize { + self.levels.len() + } + + #[inline] + pub fn level(&self, level: Level) -> &LevelMeta { + &self.levels[level as usize] + } + /// Merge `self` with files to add/remove to create a new [LevelMetas]. /// /// # Panics @@ -103,7 +117,7 @@ impl Default for LevelMetas { /// Metadata of files in same SST level. #[derive(Debug, Default, Clone)] pub struct LevelMeta { - level: u8, + level: Level, /// Handles to the files in this level. // TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range // or use another structure to hold them. @@ -111,27 +125,39 @@ pub struct LevelMeta { } impl LevelMeta { + pub fn new_empty(level: Level) -> Self { + Self { + level, + files: vec![], + } + } + fn add_file(&mut self, file: FileHandle) { self.files.push(file); } - fn visit_level(&self, visitor: &mut V) -> Result<()> { + pub fn visit_level(&self, visitor: &mut V) -> Result<()> { visitor.visit(self.level.into(), &self.files) } - #[cfg(test)] + /// Returns the level of level meta. + #[inline] + pub fn level(&self) -> Level { + self.level + } + pub fn files(&self) -> &[FileHandle] { &self.files } } fn new_level_meta_vec() -> LevelMetaVec { - let mut levels = [LevelMeta::default(); MAX_LEVEL]; - for (i, level) in levels.iter_mut().enumerate() { - level.level = i as u8; - } - - levels + (0u8..MAX_LEVEL) + .into_iter() + .map(LevelMeta::new_empty) + .collect::>() + .try_into() + .unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL } /// In-memory handle to a file. @@ -158,16 +184,15 @@ impl FileHandle { &self.inner.meta.file_name } - /// Return the start timestamp of current SST file. #[inline] - pub fn start_timestamp(&self) -> Option { - self.inner.meta.start_timestamp + pub fn time_range(&self) -> &Option<(Timestamp, Timestamp)> { + &self.inner.meta.time_range } - /// Return the end timestamp of current SST file. + /// Returns true if current file is under compaction. #[inline] - pub fn end_timestamp(&self) -> Option { - self.inner.meta.end_timestamp + pub fn compacting(&self) -> bool { + self.inner.compacting.load(Ordering::Relaxed) } } @@ -177,11 +202,15 @@ impl FileHandle { #[derive(Debug)] struct FileHandleInner { meta: FileMeta, + compacting: AtomicBool, } impl FileHandleInner { fn new(meta: FileMeta) -> FileHandleInner { - FileHandleInner { meta } + FileHandleInner { + meta, + compacting: AtomicBool::new(false), + } } } @@ -189,10 +218,9 @@ impl FileHandleInner { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct FileMeta { pub file_name: String, - pub start_timestamp: Option, - pub end_timestamp: Option, + pub time_range: Option<(Timestamp, Timestamp)>, /// SST level of the file. - pub level: u8, + pub level: Level, } #[derive(Debug, Default)] @@ -212,8 +240,7 @@ pub struct ReadOptions { #[derive(Debug)] pub struct SstInfo { - pub start_timestamp: Option, - pub end_timestamp: Option, + pub time_range: Option<(Timestamp, Timestamp)>, } /// SST access layer. diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index bb8523baad..0b6c810981 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -118,23 +118,14 @@ impl<'a> ParquetWriter<'a> { let file_meta = arrow_writer.close().context(WriteParquetSnafu)?; - let (start_timestamp, end_timestamp) = - match decode_timestamp_range(&file_meta, store_schema) { - Ok(Some((start, end))) => (Some(start), Some(end)), - Ok(None) => (None, None), - Err(e) => { - error!(e;"Failed to calculate time range of parquet file"); - (None, None) - } - }; + let time_range = decode_timestamp_range(&file_meta, store_schema) + .ok() + .flatten(); object.write(buf).await.context(WriteObjectSnafu { path: object.path(), })?; - Ok(SstInfo { - start_timestamp, - end_timestamp, - }) + Ok(SstInfo { time_range }) } } @@ -467,20 +458,18 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); - let SstInfo { - start_timestamp, - end_timestamp, - } = writer + let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - assert_eq!(Some(Timestamp::new_millisecond(0)), start_timestamp); assert_eq!( - Some(Timestamp::new_millisecond((rows_total - 1) as i64)), - end_timestamp + Some(( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond((rows_total - 1) as i64) + )), + time_range ); - let operator = ObjectStore::new( object_store::backend::fs::Builder::default() .root(dir.path().to_str().unwrap()) @@ -540,17 +529,18 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); - let SstInfo { - start_timestamp, - end_timestamp, - } = writer + let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp); - assert_eq!(Some(Timestamp::new_millisecond(2003)), end_timestamp); - + assert_eq!( + Some(( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2003) + )), + time_range + ); let operator = ObjectStore::new( object_store::backend::fs::Builder::default() .root(dir.path().to_str().unwrap()) diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index f0660ed7d3..80dde3bd6d 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -137,7 +137,7 @@ pub struct VersionEdit { pub type VersionControlRef = Arc; pub type VersionRef = Arc; type MemtableVersionRef = Arc; -type LevelMetasRef = Arc; +pub type LevelMetasRef = Arc; /// Version contains metadata and state of region. #[derive(Clone, Debug)]