mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 01:40:36 +00:00
feat: initial changes for compaction_time_window field support (#1083)
* feat(compaction_time_window): initial changes for compaction_time_window field support * feat(compaction_time_window): move PickerContext creation * feat(compaction_time_window): update region descriptor, fix formatting * feat(compaction_time_window): add minor enhancements * feat(compaction_time_window): fix failing test * feat(compaction_time_window): return an error instead silently skip for the user provided compaction_time_window * feat(compaction_time_window): add TODO reminder
This commit is contained in:
@@ -393,6 +393,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.id(region_id)
|
||||
.name(®ion_name)
|
||||
.row_key(row_key.clone())
|
||||
.compaction_time_window(request.table_options.compaction_time_window)
|
||||
.default_cf(default_cf.clone())
|
||||
.build()
|
||||
.context(BuildRegionDescriptorSnafu {
|
||||
@@ -406,6 +407,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.write_buffer_size
|
||||
.map(|size| size.0 as usize),
|
||||
ttl: request.table_options.ttl,
|
||||
compaction_time_window: request.table_options.compaction_time_window,
|
||||
};
|
||||
|
||||
let region = self
|
||||
@@ -512,6 +514,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
debug!(
|
||||
|
||||
@@ -163,15 +163,18 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
let table_options = &self.data.request.table_options;
|
||||
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
|
||||
let ttl = table_options.ttl;
|
||||
let compaction_time_window = table_options.compaction_time_window;
|
||||
let open_opts = OpenOptions {
|
||||
parent_dir: table_dir.clone(),
|
||||
write_buffer_size,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
};
|
||||
let create_opts = CreateOptions {
|
||||
parent_dir: table_dir,
|
||||
write_buffer_size,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
};
|
||||
|
||||
let primary_key_indices = &self.data.request.primary_key_indices;
|
||||
@@ -216,6 +219,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
.name(region_name.clone())
|
||||
.row_key(row_key.clone())
|
||||
.default_cf(default_cf.clone())
|
||||
.compaction_time_window(compaction_time_window)
|
||||
.build()
|
||||
.context(BuildRegionDescriptorSnafu {
|
||||
table_name: &self.data.request.table_name,
|
||||
|
||||
@@ -67,6 +67,7 @@ impl RegionDescBuilder {
|
||||
row_key: self.key_builder.build().unwrap(),
|
||||
default_cf: self.default_cf_builder.build().unwrap(),
|
||||
extra_cfs: Vec::new(),
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::marker::PhantomData;
|
||||
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::compaction::{CompactionTask, Picker, PickerContext};
|
||||
use crate::compaction::{CompactionTask, Picker};
|
||||
use crate::error::Result;
|
||||
use crate::scheduler::{Request, Scheduler};
|
||||
|
||||
@@ -49,7 +49,7 @@ impl Picker for NoopCompactionPicker {
|
||||
type Request = NoopCompactionRequest;
|
||||
type Task = NoopCompactionTask;
|
||||
|
||||
fn pick(&self, _ctx: &PickerContext, _req: &Self::Request) -> Result<Option<Self::Task>> {
|
||||
fn pick(&self, _req: &Self::Request) -> Result<Option<Self::Task>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,14 +35,24 @@ pub trait Picker: Send + 'static {
|
||||
type Request: Request;
|
||||
type Task: CompactionTask;
|
||||
|
||||
fn pick(
|
||||
&self,
|
||||
ctx: &PickerContext,
|
||||
req: &Self::Request,
|
||||
) -> crate::error::Result<Option<Self::Task>>;
|
||||
fn pick(&self, req: &Self::Request) -> crate::error::Result<Option<Self::Task>>;
|
||||
}
|
||||
|
||||
pub struct PickerContext {}
|
||||
pub struct PickerContext {
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl PickerContext {
|
||||
pub fn with(compaction_time_window: Option<i64>) -> Self {
|
||||
Self {
|
||||
compaction_time_window,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compaction_time_window(&self) -> Option<i64> {
|
||||
self.compaction_time_window
|
||||
}
|
||||
}
|
||||
|
||||
/// L0 -> L1 compaction based on time windows.
|
||||
pub struct SimplePicker<S> {
|
||||
@@ -89,7 +99,6 @@ impl<S: LogStore> Picker for SimplePicker<S> {
|
||||
|
||||
fn pick(
|
||||
&self,
|
||||
ctx: &PickerContext,
|
||||
req: &CompactionRequestImpl<S>,
|
||||
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
|
||||
let levels = &req.levels();
|
||||
@@ -110,9 +119,10 @@ impl<S: LogStore> Picker for SimplePicker<S> {
|
||||
expired_ssts.iter().for_each(|f| f.mark_compacting(true));
|
||||
}
|
||||
|
||||
let ctx = &PickerContext::with(req.compaction_time_window);
|
||||
for level_num in 0..levels.level_num() {
|
||||
let level = levels.level(level_num as u8);
|
||||
let outputs = self.strategy.pick(ctx, level);
|
||||
let (compaction_time_window, outputs) = self.strategy.pick(ctx, level);
|
||||
|
||||
if outputs.is_empty() {
|
||||
debug!("No SST file can be compacted at level {}", level_num);
|
||||
@@ -133,6 +143,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
|
||||
manifest: req.manifest.clone(),
|
||||
expired_ssts,
|
||||
sst_write_buffer_size: req.sst_write_buffer_size,
|
||||
compaction_time_window,
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use store_api::storage::RegionId;
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::compaction::picker::{Picker, PickerContext};
|
||||
use crate::compaction::picker::Picker;
|
||||
use crate::compaction::task::CompactionTask;
|
||||
use crate::error::Result;
|
||||
use crate::manifest::region::RegionManifest;
|
||||
@@ -59,6 +59,7 @@ pub struct CompactionRequestImpl<S: LogStore> {
|
||||
pub manifest: RegionManifest,
|
||||
pub wal: Wal<S>,
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
/// Compaction result sender.
|
||||
pub sender: Option<Sender<Result<()>>>,
|
||||
|
||||
@@ -101,7 +102,7 @@ where
|
||||
finish_notifier: Arc<Notify>,
|
||||
) -> Result<()> {
|
||||
let region_id = req.key();
|
||||
let Some(task) = self.picker.pick(&PickerContext {}, &req)? else {
|
||||
let Some(task) = self.picker.pick(&req)? else {
|
||||
info!("No file needs compaction in region: {:?}", region_id);
|
||||
req.complete(Ok(()));
|
||||
return Ok(());
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::sst::{FileHandle, LevelMeta};
|
||||
|
||||
/// Compaction strategy that defines which SSTs need to be compacted at given level.
|
||||
pub trait Strategy {
|
||||
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput>;
|
||||
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>);
|
||||
}
|
||||
|
||||
pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
|
||||
@@ -37,29 +37,33 @@ pub type StrategyRef = Arc<dyn Strategy + Send + Sync>;
|
||||
pub struct SimpleTimeWindowStrategy {}
|
||||
|
||||
impl Strategy for SimpleTimeWindowStrategy {
|
||||
fn pick(&self, _ctx: &PickerContext, level: &LevelMeta) -> Vec<CompactionOutput> {
|
||||
fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option<i64>, Vec<CompactionOutput>) {
|
||||
// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction.
|
||||
if level.level() != 0 {
|
||||
return vec![];
|
||||
return (None, vec![]);
|
||||
}
|
||||
let files = find_compactable_files(level);
|
||||
debug!("Compactable files found: {:?}", files);
|
||||
if files.is_empty() {
|
||||
return vec![];
|
||||
return (None, vec![]);
|
||||
}
|
||||
|
||||
let time_bucket = infer_time_bucket(&files);
|
||||
let time_bucket = ctx
|
||||
.compaction_time_window()
|
||||
.unwrap_or_else(|| infer_time_bucket(&files));
|
||||
let buckets = calculate_time_buckets(time_bucket, &files);
|
||||
debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets);
|
||||
buckets
|
||||
.into_iter()
|
||||
.map(|(bound, files)| CompactionOutput {
|
||||
output_level: 1,
|
||||
bucket_bound: bound,
|
||||
bucket: time_bucket,
|
||||
inputs: files,
|
||||
})
|
||||
.collect()
|
||||
(
|
||||
Some(time_bucket),
|
||||
buckets
|
||||
.into_iter()
|
||||
.map(|(bound, files)| CompactionOutput {
|
||||
output_level: 1,
|
||||
bucket_bound: bound,
|
||||
bucket: time_bucket,
|
||||
inputs: files,
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ pub struct CompactionTaskImpl<S: LogStore> {
|
||||
pub manifest: RegionManifest,
|
||||
pub expired_ssts: Vec<FileHandle>,
|
||||
pub sst_write_buffer_size: ReadableSize,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl<S: LogStore> Debug for CompactionTaskImpl<S> {
|
||||
@@ -101,6 +102,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
|
||||
}
|
||||
|
||||
/// Writes updated SST info into manifest.
|
||||
// TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083]
|
||||
async fn write_manifest_and_apply(
|
||||
&self,
|
||||
output: HashSet<FileMeta>,
|
||||
|
||||
@@ -298,6 +298,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
name,
|
||||
&self.config,
|
||||
opts.ttl,
|
||||
opts.compaction_time_window,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -340,6 +341,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
®ion_name,
|
||||
&self.config,
|
||||
opts.ttl,
|
||||
opts.compaction_time_window,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -368,6 +370,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
region_name: &str,
|
||||
config: &EngineConfig,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> Result<StoreConfig<S>> {
|
||||
let parent_dir = util::normalize_dir(parent_dir);
|
||||
|
||||
@@ -397,6 +400,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
engine_config: self.config.clone(),
|
||||
file_purger: self.file_purger.clone(),
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@ pub struct RawRegionMetadata {
|
||||
pub columns: RawColumnsMetadata,
|
||||
pub column_families: RawColumnFamiliesMetadata,
|
||||
pub version: VersionNumber,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
|
||||
|
||||
@@ -198,6 +198,8 @@ pub struct RegionMetadata {
|
||||
pub columns: ColumnsMetadataRef,
|
||||
column_families: ColumnFamiliesMetadata,
|
||||
version: VersionNumber,
|
||||
/// Time window for compaction
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl RegionMetadata {
|
||||
@@ -216,6 +218,11 @@ impl RegionMetadata {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn compaction_time_window(&self) -> Option<i64> {
|
||||
self.compaction_time_window
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn user_schema(&self) -> &SchemaRef {
|
||||
self.schema.user_schema()
|
||||
@@ -317,7 +324,8 @@ impl RegionMetadata {
|
||||
let mut builder = RegionDescriptorBuilder::default()
|
||||
.id(self.id)
|
||||
.name(&self.name)
|
||||
.row_key(row_key);
|
||||
.row_key(row_key)
|
||||
.compaction_time_window(self.compaction_time_window);
|
||||
|
||||
for (cf_id, cf) in &self.column_families.id_to_cfs {
|
||||
let mut cf_builder = ColumnFamilyDescriptorBuilder::default()
|
||||
@@ -350,6 +358,7 @@ impl From<&RegionMetadata> for RawRegionMetadata {
|
||||
columns: RawColumnsMetadata::from(&*data.columns),
|
||||
column_families: RawColumnFamiliesMetadata::from(&data.column_families),
|
||||
version: data.version,
|
||||
compaction_time_window: data.compaction_time_window,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -368,6 +377,7 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
|
||||
columns,
|
||||
column_families: raw.column_families.into(),
|
||||
version: raw.version,
|
||||
compaction_time_window: raw.compaction_time_window,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -635,6 +645,7 @@ impl TryFrom<RegionDescriptor> for RegionMetadataBuilder {
|
||||
.name(desc.name)
|
||||
.id(desc.id)
|
||||
.row_key(desc.row_key)?
|
||||
.compaction_time_window(desc.compaction_time_window)
|
||||
.add_column_family(desc.default_cf)?;
|
||||
for cf in desc.extra_cfs {
|
||||
builder = builder.add_column_family(cf)?;
|
||||
@@ -791,6 +802,7 @@ struct RegionMetadataBuilder {
|
||||
columns_meta_builder: ColumnsMetadataBuilder,
|
||||
cfs_meta_builder: ColumnFamiliesMetadataBuilder,
|
||||
version: VersionNumber,
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl Default for RegionMetadataBuilder {
|
||||
@@ -807,6 +819,7 @@ impl RegionMetadataBuilder {
|
||||
columns_meta_builder: ColumnsMetadataBuilder::default(),
|
||||
cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(),
|
||||
version: Schema::INITIAL_VERSION,
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -831,6 +844,11 @@ impl RegionMetadataBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn compaction_time_window(mut self, compaction_time_window: Option<i64>) -> Self {
|
||||
self.compaction_time_window = compaction_time_window;
|
||||
self
|
||||
}
|
||||
|
||||
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
|
||||
let column_index_start = self.columns_meta_builder.columns.len();
|
||||
let column_index_end = column_index_start + cf.columns.len();
|
||||
@@ -861,6 +879,7 @@ impl RegionMetadataBuilder {
|
||||
columns,
|
||||
column_families: self.cfs_meta_builder.build(),
|
||||
version: self.version,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1065,6 +1084,7 @@ mod tests {
|
||||
.unwrap();
|
||||
RegionMetadataBuilder::new()
|
||||
.name(TEST_REGION)
|
||||
.compaction_time_window(None)
|
||||
.row_key(row_key)
|
||||
.unwrap()
|
||||
.add_column_family(cf)
|
||||
|
||||
@@ -156,6 +156,7 @@ pub struct StoreConfig<S: LogStore> {
|
||||
pub engine_config: Arc<EngineConfig>,
|
||||
pub file_purger: FilePurgerRef,
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
|
||||
@@ -232,6 +233,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
store_config.memtable_builder,
|
||||
store_config.engine_config.clone(),
|
||||
store_config.ttl,
|
||||
store_config.compaction_time_window,
|
||||
)),
|
||||
wal,
|
||||
flush_strategy: store_config.flush_strategy,
|
||||
@@ -250,7 +252,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
pub async fn open(
|
||||
name: String,
|
||||
store_config: StoreConfig<S>,
|
||||
_opts: &OpenOptions,
|
||||
opts: &OpenOptions,
|
||||
) -> Result<Option<RegionImpl<S>>> {
|
||||
// Load version meta data from manifest.
|
||||
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
|
||||
@@ -307,11 +309,14 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
name,
|
||||
version_control,
|
||||
});
|
||||
|
||||
let compaction_time_window = store_config
|
||||
.compaction_time_window
|
||||
.or(opts.compaction_time_window);
|
||||
let writer = Arc::new(RegionWriter::new(
|
||||
store_config.memtable_builder,
|
||||
store_config.engine_config.clone(),
|
||||
store_config.ttl,
|
||||
compaction_time_window,
|
||||
));
|
||||
let writer_ctx = WriterContext {
|
||||
shared: &shared,
|
||||
|
||||
@@ -68,9 +68,15 @@ impl RegionWriter {
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> RegionWriter {
|
||||
RegionWriter {
|
||||
inner: Mutex::new(WriterInner::new(memtable_builder, config, ttl)),
|
||||
inner: Mutex::new(WriterInner::new(
|
||||
memtable_builder,
|
||||
config,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
)),
|
||||
version_mutex: Mutex::new(()),
|
||||
}
|
||||
}
|
||||
@@ -361,6 +367,7 @@ struct WriterInner {
|
||||
closed: bool,
|
||||
engine_config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl WriterInner {
|
||||
@@ -368,6 +375,7 @@ impl WriterInner {
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
engine_config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> WriterInner {
|
||||
WriterInner {
|
||||
memtable_builder,
|
||||
@@ -375,6 +383,7 @@ impl WriterInner {
|
||||
engine_config,
|
||||
closed: false,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -638,7 +647,13 @@ impl WriterInner {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let cb = Self::build_flush_callback(¤t_version, ctx, &self.engine_config, self.ttl);
|
||||
let cb = Self::build_flush_callback(
|
||||
¤t_version,
|
||||
ctx,
|
||||
&self.engine_config,
|
||||
self.ttl,
|
||||
self.compaction_time_window,
|
||||
);
|
||||
|
||||
let flush_req = FlushJob {
|
||||
max_memtable_id: max_memtable_id.unwrap(),
|
||||
@@ -678,6 +693,7 @@ impl WriterInner {
|
||||
manifest: writer_ctx.manifest.clone(),
|
||||
wal: writer_ctx.wal.clone(),
|
||||
ttl: self.ttl,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
sender: None,
|
||||
sst_write_buffer_size,
|
||||
};
|
||||
@@ -725,6 +741,7 @@ impl WriterInner {
|
||||
ctx: &WriterContext<S>,
|
||||
config: &Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> Option<FlushCallback> {
|
||||
let region_id = version.metadata().id();
|
||||
let compaction_request = CompactionRequestImpl {
|
||||
@@ -735,6 +752,7 @@ impl WriterInner {
|
||||
manifest: ctx.manifest.clone(),
|
||||
wal: ctx.wal.clone(),
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
sender: None,
|
||||
sst_write_buffer_size: config.sst_write_buffer_size,
|
||||
};
|
||||
|
||||
@@ -83,5 +83,6 @@ pub async fn new_store_config_with_object_store(
|
||||
engine_config: Default::default(),
|
||||
file_purger,
|
||||
ttl: None,
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ impl RegionDescBuilder {
|
||||
row_key: self.key_builder.build().unwrap(),
|
||||
default_cf: self.default_cf_builder.build().unwrap(),
|
||||
extra_cfs: Vec::new(),
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -149,6 +149,8 @@ pub struct RegionDescriptor {
|
||||
/// Extra column families defined by user.
|
||||
#[builder(default, setter(each(name = "push_extra_column_family")))]
|
||||
pub extra_cfs: Vec<ColumnFamilyDescriptor>,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl RowKeyDescriptorBuilder {
|
||||
|
||||
@@ -88,6 +88,7 @@ pub struct CreateOptions {
|
||||
pub write_buffer_size: Option<usize>,
|
||||
/// Region SST files TTL
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// Options to open a region.
|
||||
@@ -99,4 +100,5 @@ pub struct OpenOptions {
|
||||
pub write_buffer_size: Option<usize>,
|
||||
/// Region SST files TTL
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
@@ -162,6 +162,7 @@ mod tests {
|
||||
.name("test")
|
||||
.row_key(row_key)
|
||||
.default_cf(default_cf)
|
||||
.compaction_time_window(Some(1677652502))
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -60,10 +60,13 @@ pub struct TableOptions {
|
||||
pub ttl: Option<Duration>,
|
||||
/// Extra options that may not applicable to all table engines.
|
||||
pub extra_options: HashMap<String, String>,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
|
||||
pub const TTL_KEY: &str = "ttl";
|
||||
pub const COMPACTION_TIME_WINDOW_KEY: &str = "compaction_time_window";
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for TableOptions {
|
||||
type Error = error::Error;
|
||||
@@ -94,8 +97,20 @@ impl TryFrom<&HashMap<String, String>> for TableOptions {
|
||||
.into();
|
||||
options.ttl = Some(ttl_value);
|
||||
}
|
||||
if let Some(compaction_time_window) = value.get(COMPACTION_TIME_WINDOW_KEY) {
|
||||
options.compaction_time_window = match compaction_time_window.parse::<i64>() {
|
||||
Ok(t) => Some(t),
|
||||
Err(_) => {
|
||||
return ParseTableOptionSnafu {
|
||||
key: COMPACTION_TIME_WINDOW_KEY,
|
||||
value: compaction_time_window,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
}
|
||||
options.extra_options = HashMap::from_iter(value.iter().filter_map(|(k, v)| {
|
||||
if k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY {
|
||||
if k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY && k != COMPACTION_TIME_WINDOW_KEY {
|
||||
Some((k.clone(), v.clone()))
|
||||
} else {
|
||||
None
|
||||
@@ -118,6 +133,12 @@ impl From<&TableOptions> for HashMap<String, String> {
|
||||
let ttl_str = humantime::format_duration(ttl).to_string();
|
||||
res.insert(TTL_KEY.to_string(), ttl_str);
|
||||
}
|
||||
if let Some(compaction_time_window) = opts.compaction_time_window {
|
||||
res.insert(
|
||||
COMPACTION_TIME_WINDOW_KEY.to_string(),
|
||||
compaction_time_window.to_string(),
|
||||
);
|
||||
}
|
||||
res.extend(
|
||||
opts.extra_options
|
||||
.iter()
|
||||
@@ -229,6 +250,7 @@ mod tests {
|
||||
write_buffer_size: None,
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized = serde_json::to_string(&options).unwrap();
|
||||
let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
|
||||
@@ -241,6 +263,7 @@ mod tests {
|
||||
write_buffer_size: Some(ReadableSize::mb(128)),
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
@@ -250,6 +273,7 @@ mod tests {
|
||||
write_buffer_size: None,
|
||||
ttl: None,
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: None,
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
@@ -259,6 +283,7 @@ mod tests {
|
||||
write_buffer_size: Some(ReadableSize::mb(128)),
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user