feat: L0 to L1 compaction strategy (#964)

* feat: impl simple compaction strategy

* chore: rebase to develop and fix clippy warnings

* chore: simplify time bucket strcut

* chore: some typos
This commit is contained in:
Lei, HUANG
2023-02-11 21:10:24 +08:00
committed by GitHub
parent 7d6f4cd88b
commit e77a7f253c
12 changed files with 544 additions and 100 deletions

View File

@@ -14,6 +14,8 @@
use std::cmp::Ordering;
use crate::Timestamp;
/// Unix timestamp in millisecond resolution.
///
/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'.
@@ -77,22 +79,29 @@ impl PartialOrd<TimestampMillis> for i64 {
}
}
pub trait BucketAligned {
/// Returns the timestamp aligned by `bucket_duration` in milliseconds or
/// `None` if overflow occurred.
pub trait BucketAligned: Sized {
/// Returns the timestamp aligned by `bucket_duration` or `None` if underflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis>;
fn align_by_bucket(self, bucket_duration: i64) -> Option<Self>;
}
impl<T: Into<i64>> BucketAligned for T {
fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis> {
impl BucketAligned for i64 {
fn align_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
self.into()
.checked_div_euclid(bucket_duration)
self.checked_div_euclid(bucket_duration)
.and_then(|val| val.checked_mul(bucket_duration))
.map(TimestampMillis)
}
}
impl BucketAligned for Timestamp {
fn align_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
let unit = self.unit();
self.value()
.align_by_bucket(bucket_duration)
.map(|val| Timestamp::new(val, unit))
}
}
@@ -121,24 +130,54 @@ mod tests {
#[test]
fn test_align_by_bucket() {
let bucket = 100;
assert_eq!(0, TimestampMillis::new(0).align_by_bucket(bucket).unwrap());
assert_eq!(0, TimestampMillis::new(1).align_by_bucket(bucket).unwrap());
assert_eq!(0, TimestampMillis::new(99).align_by_bucket(bucket).unwrap());
assert_eq!(
100,
TimestampMillis::new(100).align_by_bucket(bucket).unwrap()
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(0)
.align_by_bucket(bucket)
.unwrap()
);
assert_eq!(
100,
TimestampMillis::new(199).align_by_bucket(bucket).unwrap()
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1)
.align_by_bucket(bucket)
.unwrap()
);
assert_eq!(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
.align_by_bucket(bucket)
.unwrap()
);
assert_eq!(
Timestamp::new_millisecond(100),
Timestamp::new_millisecond(100)
.align_by_bucket(bucket)
.unwrap()
);
assert_eq!(
Timestamp::new_millisecond(100),
Timestamp::new_millisecond(199)
.align_by_bucket(bucket)
.unwrap()
);
assert_eq!(0, TimestampMillis::MAX.align_by_bucket(i64::MAX).unwrap());
assert_eq!(
i64::MAX,
TimestampMillis::INF.align_by_bucket(i64::MAX).unwrap()
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(i64::MAX - 1)
.align_by_bucket(i64::MAX)
.unwrap()
);
assert_eq!(None, TimestampMillis::MIN.align_by_bucket(bucket));
assert_eq!(
Timestamp::new_millisecond(i64::MAX),
Timestamp::new_millisecond(i64::MAX)
.align_by_bucket(i64::MAX)
.unwrap()
);
assert_eq!(
None,
Timestamp::new_millisecond(i64::MIN).align_by_bucket(bucket)
);
}
}

View File

@@ -184,8 +184,8 @@ impl ChunkReaderBuilder {
return true;
}
// end_timestamp of sst file is inclusive.
let file_ts_range =
TimestampRange::new_inclusive(file.start_timestamp(), file.end_timestamp());
let Some((start, end)) = *file.time_range() else { return true; };
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(&predicate)
}
}

View File

@@ -16,4 +16,5 @@ mod dedup_deque;
mod picker;
mod rate_limit;
mod scheduler;
mod strategy;
mod task;

View File

@@ -12,28 +12,54 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::debug;
use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::strategy::StrategyRef;
use crate::compaction::task::{CompactionTask, CompactionTaskImpl};
/// Picker picks input SST files and build the compaction task.
/// Picker picks input SST files and builds the compaction task.
/// Different compaction strategy may implement different pickers.
pub trait Picker<R, T: CompactionTask>: Send + 'static {
fn pick(&self, req: &R) -> crate::error::Result<T>;
fn pick(&self, ctx: &PickerContext, req: &R) -> crate::error::Result<Option<T>>;
}
pub struct PickerContext {}
/// L0 -> L1 all-to-all compaction based on time windows.
pub(crate) struct SimplePicker {}
pub(crate) struct SimplePicker {
strategy: StrategyRef,
}
#[allow(unused)]
impl SimplePicker {
pub fn new() -> Self {
Self {}
pub fn new(strategy: StrategyRef) -> Self {
Self { strategy }
}
}
impl Picker<CompactionRequestImpl, CompactionTaskImpl> for SimplePicker {
fn pick(&self, _req: &CompactionRequestImpl) -> crate::error::Result<CompactionTaskImpl> {
todo!()
fn pick(
&self,
ctx: &PickerContext,
req: &CompactionRequestImpl,
) -> crate::error::Result<Option<CompactionTaskImpl>> {
let levels = req.levels();
for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let outputs = self.strategy.pick(ctx, level);
if outputs.is_empty() {
debug!("No SST file can be compacted at level {}", level_num);
return Ok(None);
}
debug!("Found SST files to compact {:?}", outputs);
// TODO(hl): build compaction task
}
Ok(None)
}
}
@@ -60,8 +86,12 @@ pub mod tests {
}
impl<R: CompactionRequest> Picker<R, NoopCompactionTask> for MockPicker<R> {
fn pick(&self, _req: &R) -> crate::error::Result<NoopCompactionTask> {
Ok(NoopCompactionTask::new(self.cbs.clone()))
fn pick(
&self,
_ctx: &PickerContext,
_req: &R,
) -> crate::error::Result<Option<NoopCompactionTask>> {
Ok(Some(NoopCompactionTask::new(self.cbs.clone())))
}
}
}

View File

@@ -24,17 +24,25 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::compaction::dedup_deque::DedupDeque;
use crate::compaction::picker::Picker;
use crate::compaction::picker::{Picker, PickerContext};
use crate::compaction::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter, RateLimitToken, RateLimiter,
};
use crate::compaction::task::CompactionTask;
use crate::error::{Result, StopCompactionSchedulerSnafu};
use crate::version::LevelMetasRef;
/// Table compaction request.
#[derive(Default)]
pub struct CompactionRequestImpl {
table_id: TableId,
levels: LevelMetasRef,
}
impl CompactionRequestImpl {
pub fn levels(&self) -> &LevelMetasRef {
&self.levels
}
}
impl CompactionRequest for CompactionRequestImpl {
@@ -162,7 +170,7 @@ struct CompactionHandler<R, T: CompactionTask, P: Picker<R, T>> {
}
#[allow(unused)]
impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
impl<R: CompactionRequest, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
/// Runs table compaction requests dispatch loop.
pub async fn run(&self) {
let task_notifier = self.task_notifier.clone();
@@ -214,7 +222,11 @@ impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
token: BoxedRateLimitToken,
) -> Result<()> {
let cloned_notify = self.task_notifier.clone();
let task = self.build_compaction_task(req).await?;
let table_id = req.table_id();
let Some(task) = self.build_compaction_task(req).await? else {
info!("No file needs compaction in table: {}", table_id);
return Ok(());
};
// TODO(hl): we need to keep a track of task handle here to allow task cancellation.
common_runtime::spawn_bg(async move {
@@ -230,8 +242,9 @@ impl<R, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
}
// TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction)
async fn build_compaction_task(&self, req: R) -> crate::error::Result<T> {
self.picker.pick(&req)
async fn build_compaction_task(&self, req: R) -> crate::error::Result<Option<T>> {
let ctx = PickerContext {};
self.picker.pick(&ctx, &req)
}
}

View File

@@ -0,0 +1,334 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::{debug, warn};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use crate::compaction::picker::PickerContext;
use crate::compaction::task::CompactionOutput;
use crate::sst::{FileHandle, LevelMeta};
/// Compaction strategy that defines which SSTs need to be compacted at given level.
pub trait Strategy {
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput>;
}
pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
/// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction in a time-window tiered
/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned
/// by a inferred time bucket in level 1.
pub struct SimpleTimeWindowStrategy {}
impl Strategy for SimpleTimeWindowStrategy {
fn pick(&self, _ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput> {
// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction.
if level.level() != 0 {
return vec![];
}
let files = find_compactable_files(level);
if files.is_empty() {
return vec![];
}
let time_bucket = infer_time_bucket(&files);
let buckets = calculate_time_buckets(time_bucket, &files);
debug!("File buckets: {:?}", buckets);
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
output_level: 1,
bucket_bound: bound,
bucket: time_bucket,
inputs: files,
})
.collect()
}
}
/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level
.files()
.iter()
.filter(|f| !f.compacting())
.cloned()
.collect()
}
/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();
for file in files {
if let Some((start, end)) = file.time_range() {
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
} else {
// Files without timestamp range is assign to a special bucket `i64::MAX`,
// so that they can be compacted together.
buckets
.entry(i64::MAX)
.or_insert_with(Vec::new)
.push(file.clone());
}
}
buckets
}
/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);
// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
fn infer_time_bucket(files: &[FileHandle]) -> i64 {
let mut max_ts = &Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = &Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
if let Some((start, end)) = f.time_range() {
min_ts = min_ts.min(start);
max_ts = max_ts.max(end);
} else {
// we don't expect an SST file without time range,
// it's either a bug or data corruption.
warn!("Found SST file without time range metadata: {f:?}");
}
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(fit_time_bucket) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| *TIME_BUCKETS.last().unwrap()) // safety: TIME_BUCKETS cannot be empty.
}
/// A set of predefined time buckets.
const TIME_BUCKETS: [i64; 5] = [
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
];
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(span_sec: i64) -> i64 {
assert!(span_sec >= 0);
for b in TIME_BUCKETS {
if b >= span_sec {
return b;
}
}
*TIME_BUCKETS.last().unwrap()
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::sst::FileMeta;
#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}
#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);
// magic hmmm?
for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS[0], fit_time_bucket(1));
assert_eq!(TIME_BUCKETS[0], fit_time_bucket(60 * 60));
assert_eq!(TIME_BUCKETS[1], fit_time_bucket(60 * 60 + 1));
assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2] - 1));
assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2]));
assert_eq!(TIME_BUCKETS[3], fit_time_bucket(TIME_BUCKETS[3] - 1));
assert_eq!(TIME_BUCKETS[4], fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS[0],
infer_time_bucket(&[
new_file_handle("a", 0, TIME_BUCKETS[0] * 1000 - 1),
new_file_handle("b", 1, 10_000)
])
);
}
fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
FileHandle::new(FileMeta {
file_name: name.to_string(),
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
})
}
fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(name, start, end)| new_file_handle(name, *start, *end))
.collect()
}
fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[&str])],
) {
let res = calculate_time_buckets(bucket_sec, &files);
let expected = expected
.iter()
.map(|(bucket, file_names)| {
(
*bucket,
file_names
.iter()
.map(|s| s.to_string())
.collect::<HashSet<_>>(),
)
})
.collect::<HashMap<_, _>>();
for (bucket, file_names) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_name().to_string())
.collect();
assert_eq!(
file_names, actual,
"bucket: {bucket}, expected: {file_names:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_calculate_time_buckets() {
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 9000), ("b", 10000, 19000)]),
&[(0, &["a"]), (10, &["b"])],
);
// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 10001), ("b", 10000, 19000)]),
&[(0, &["a"]), (10, &["a", "b"])],
);
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 10000)]),
&[(0, &["a"]), (10, &["a"])],
);
// files without timestamp are align to a special bucket: i64::MAX
check_bucket_calculation(
10,
vec![FileHandle::new(FileMeta {
file_name: "a".to_string(),
time_range: None,
level: 0,
})],
&[(i64::MAX, &["a"])],
);
// file with an large time range
let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0]))
.into_iter()
.map(|b| (b * TIME_BUCKETS[0], &["a"] as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS[0],
new_file_handles(&[("a", 0, TIME_BUCKETS[4] * 1000)]),
&expected,
);
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use crate::error::Result;
use crate::sst::FileHandle;
use crate::sst::{FileHandle, Level};
#[async_trait::async_trait]
pub trait CompactionTask: Send + Sync + 'static {
@@ -40,6 +40,21 @@ pub(crate) struct CompactionInput {
file: FileHandle,
}
/// Many-to-many compaction can be decomposed to a many-to-one compaction from level n to level n+1
/// and a many-to-one compaction from level n+1 to level n+1.
#[derive(Debug)]
#[allow(unused)]
pub struct CompactionOutput {
/// Compaction output file level.
pub(crate) output_level: Level,
/// The left bound of time bucket.
pub(crate) bucket_bound: i64,
/// Bucket duration in seconds.
pub(crate) bucket: i64,
/// Compaction input files.
pub(crate) inputs: Vec<FileHandle>,
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;

View File

@@ -185,18 +185,14 @@ impl<S: LogStore> FlushJob<S> {
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
futures.push(async move {
let SstInfo {
start_timestamp,
end_timestamp,
} = self
let SstInfo { time_range } = self
.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())
.await?;
Ok(FileMeta {
file_name,
start_timestamp,
end_timestamp,
time_range,
level: 0,
})
});

View File

@@ -30,6 +30,7 @@ pub fn build_region_meta() -> RegionMetadata {
desc.try_into().unwrap()
}
// TODO(hl): region edit should contain the time range of added files
pub fn build_region_edit(
sequence: SequenceNumber,
files_to_add: &[&str],
@@ -42,8 +43,7 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
time_range: None,
level: 0,
})
.collect(),
@@ -51,8 +51,7 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
time_range: None,
level: 0,
})
.collect(),

View File

@@ -14,6 +14,7 @@
mod parquet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -29,11 +30,13 @@ use crate::schema::ProjectedSchemaRef;
use crate::sst::parquet::{ParquetReader, ParquetWriter};
/// Maximum level of SSTs.
pub const MAX_LEVEL: usize = 1;
pub const MAX_LEVEL: u8 = 2;
// We only has fixed number of level, so we array to hold elements. This implement
pub type Level = u8;
// We only has fixed number of level, so we use array to hold elements. This implementation
// detail of LevelMetaVec should not be exposed to the user of [LevelMetas].
type LevelMetaVec = [LevelMeta; MAX_LEVEL];
type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
/// Visitor to access file in each level.
pub trait Visitor {
@@ -59,6 +62,17 @@ impl LevelMetas {
}
}
/// Returns total level number.
#[inline]
pub fn level_num(&self) -> usize {
self.levels.len()
}
#[inline]
pub fn level(&self, level: Level) -> &LevelMeta {
&self.levels[level as usize]
}
/// Merge `self` with files to add/remove to create a new [LevelMetas].
///
/// # Panics
@@ -103,7 +117,7 @@ impl Default for LevelMetas {
/// Metadata of files in same SST level.
#[derive(Debug, Default, Clone)]
pub struct LevelMeta {
level: u8,
level: Level,
/// Handles to the files in this level.
// TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range
// or use another structure to hold them.
@@ -111,27 +125,39 @@ pub struct LevelMeta {
}
impl LevelMeta {
pub fn new_empty(level: Level) -> Self {
Self {
level,
files: vec![],
}
}
fn add_file(&mut self, file: FileHandle) {
self.files.push(file);
}
fn visit_level<V: Visitor>(&self, visitor: &mut V) -> Result<()> {
pub fn visit_level<V: Visitor>(&self, visitor: &mut V) -> Result<()> {
visitor.visit(self.level.into(), &self.files)
}
#[cfg(test)]
/// Returns the level of level meta.
#[inline]
pub fn level(&self) -> Level {
self.level
}
pub fn files(&self) -> &[FileHandle] {
&self.files
}
}
fn new_level_meta_vec() -> LevelMetaVec {
let mut levels = [LevelMeta::default(); MAX_LEVEL];
for (i, level) in levels.iter_mut().enumerate() {
level.level = i as u8;
}
levels
(0u8..MAX_LEVEL)
.into_iter()
.map(LevelMeta::new_empty)
.collect::<Vec<_>>()
.try_into()
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
}
/// In-memory handle to a file.
@@ -158,16 +184,15 @@ impl FileHandle {
&self.inner.meta.file_name
}
/// Return the start timestamp of current SST file.
#[inline]
pub fn start_timestamp(&self) -> Option<Timestamp> {
self.inner.meta.start_timestamp
pub fn time_range(&self) -> &Option<(Timestamp, Timestamp)> {
&self.inner.meta.time_range
}
/// Return the end timestamp of current SST file.
/// Returns true if current file is under compaction.
#[inline]
pub fn end_timestamp(&self) -> Option<Timestamp> {
self.inner.meta.end_timestamp
pub fn compacting(&self) -> bool {
self.inner.compacting.load(Ordering::Relaxed)
}
}
@@ -177,11 +202,15 @@ impl FileHandle {
#[derive(Debug)]
struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
}
impl FileHandleInner {
fn new(meta: FileMeta) -> FileHandleInner {
FileHandleInner { meta }
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
}
}
}
@@ -189,10 +218,9 @@ impl FileHandleInner {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileMeta {
pub file_name: String,
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
pub time_range: Option<(Timestamp, Timestamp)>,
/// SST level of the file.
pub level: u8,
pub level: Level,
}
#[derive(Debug, Default)]
@@ -212,8 +240,7 @@ pub struct ReadOptions {
#[derive(Debug)]
pub struct SstInfo {
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
pub time_range: Option<(Timestamp, Timestamp)>,
}
/// SST access layer.

View File

@@ -118,23 +118,14 @@ impl<'a> ParquetWriter<'a> {
let file_meta = arrow_writer.close().context(WriteParquetSnafu)?;
let (start_timestamp, end_timestamp) =
match decode_timestamp_range(&file_meta, store_schema) {
Ok(Some((start, end))) => (Some(start), Some(end)),
Ok(None) => (None, None),
Err(e) => {
error!(e;"Failed to calculate time range of parquet file");
(None, None)
}
};
let time_range = decode_timestamp_range(&file_meta, store_schema)
.ok()
.flatten();
object.write(buf).await.context(WriteObjectSnafu {
path: object.path(),
})?;
Ok(SstInfo {
start_timestamp,
end_timestamp,
})
Ok(SstInfo { time_range })
}
}
@@ -467,20 +458,18 @@ mod tests {
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
let SstInfo {
start_timestamp,
end_timestamp,
} = writer
let SstInfo { time_range } = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
assert_eq!(Some(Timestamp::new_millisecond(0)), start_timestamp);
assert_eq!(
Some(Timestamp::new_millisecond((rows_total - 1) as i64)),
end_timestamp
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond((rows_total - 1) as i64)
)),
time_range
);
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())
@@ -540,17 +529,18 @@ mod tests {
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
let SstInfo {
start_timestamp,
end_timestamp,
} = writer
let SstInfo { time_range } = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp);
assert_eq!(Some(Timestamp::new_millisecond(2003)), end_timestamp);
assert_eq!(
Some((
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2003)
)),
time_range
);
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())

View File

@@ -137,7 +137,7 @@ pub struct VersionEdit {
pub type VersionControlRef = Arc<VersionControl>;
pub type VersionRef = Arc<Version>;
type MemtableVersionRef = Arc<MemtableVersion>;
type LevelMetasRef = Arc<LevelMetas>;
pub type LevelMetasRef = Arc<LevelMetas>;
/// Version contains metadata and state of region.
#[derive(Clone, Debug)]