feat(mito): make use of options in RegionCreate/OpenRequest (#2436)

* refactor: move RegionOptions to options mod

* refactor: define compaction strategy in region/options.rs

* feat: use duration for time window

* refactor: rename CompactionStrategy to CompactionOptions

* feat: use serde to parse options

* feat: parse options

* feat: set options on creation/opening

* test: test create/open with options

* chore: remove todo

* feat: get compaction ttl and options from RegionOptions

* style: fix clippy

* chore: Remove unused engine_options

* style: fix clippy

* chore: remove todo
This commit is contained in:
Yingwen
2023-09-19 17:06:09 +08:00
committed by GitHub
parent 1fb2d95c5f
commit 7b606ed289
26 changed files with 396 additions and 67 deletions

34
Cargo.lock generated
View File

@@ -2737,6 +2737,9 @@ name = "deranged"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946"
dependencies = [
"serde",
]
[[package]]
name = "derive-new"
@@ -4477,6 +4480,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [
"equivalent",
"hashbrown 0.14.0",
"serde",
]
[[package]]
@@ -5497,6 +5501,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"serde_with",
"smallvec",
"snafu",
"store-api",
@@ -8663,6 +8668,35 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237"
dependencies = [
"base64 0.21.3",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.0.0",
"serde",
"serde_json",
"serde_with_macros",
"time 0.3.28",
]
[[package]]
name = "serde_with_macros"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c"
dependencies = [
"darling 0.20.3",
"proc-macro2",
"quote",
"syn 2.0.29",
]
[[package]]
name = "serde_yaml"
version = "0.9.25"

View File

@@ -120,7 +120,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![],
next_column_id: columns as u32 + 1,
engine_options: Default::default(),
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),

View File

@@ -275,7 +275,6 @@ mod tests {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![0, 1],
next_column_id: 3,
engine_options: Default::default(),
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],

View File

@@ -100,7 +100,6 @@ pub mod mock {
#[cfg(test)]
pub mod test_data {
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
@@ -178,7 +177,6 @@ pub mod test_data {
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],

View File

@@ -70,8 +70,6 @@ pub(crate) async fn fetch_tables(
#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::TableMetadataManagerRef;
@@ -103,7 +101,6 @@ pub(crate) mod tests {
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],

View File

@@ -46,6 +46,7 @@ prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
serde_with = "3"
smallvec.workspace = true
snafu.workspace = true
store-api = { workspace = true }

View File

@@ -20,12 +20,11 @@ mod twcs;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
@@ -33,6 +32,7 @@ use crate::compaction::twcs::TwcsPicker;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
@@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef;
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
@@ -64,13 +63,13 @@ impl CompactionRequest {
}
}
/// Builds compaction picker according to [CompactionStrategy].
pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef {
/// Builds compaction picker according to [CompactionOptions].
pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}
@@ -175,9 +174,7 @@ impl CompactionScheduler {
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
let picker = compaction_options_to_picker(&request.current_version.options.compaction);
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
@@ -309,8 +306,6 @@ impl CompactionStatus {
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
// TODO(hl): get TTL info from region metadata
ttl: None,
// TODO(hl): get persisted region compaction time window
compaction_time_window: None,
request_sender: request_sender.clone(),

View File

@@ -120,7 +120,6 @@ impl Picker for TwcsPicker {
let CompactionRequest {
current_version,
access_layer,
ttl,
compaction_time_window,
request_sender,
waiters,
@@ -131,6 +130,7 @@ impl Picker for TwcsPicker {
let region_id = region_metadata.region_id;
let levels = current_version.ssts.levels();
let ttl = current_version.options.ttl;
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
@@ -376,7 +376,6 @@ impl CompactionTask for TwcsCompactionTask {
notify,
})
.await;
// TODO(hl): handle reschedule
}
}

View File

@@ -25,8 +25,6 @@ use serde::{Deserialize, Serialize};
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
/// Default region write buffer size.
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone)]

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
@@ -77,3 +79,25 @@ async fn test_engine_create_existing_region() {
"unexpected err: {err}"
);
}
#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("ttl", "10d")
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 10),
region.version().options.ttl.unwrap()
);
}

View File

@@ -13,12 +13,15 @@
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
@@ -125,3 +128,42 @@ async fn test_engine_open_readonly() {
engine.set_writable(region_id, true).unwrap();
put_rows(&engine, region_id, rows).await;
}
#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
// Open the region again with options.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 4),
region.version().options.ttl.unwrap()
);
}

View File

@@ -453,6 +453,12 @@ pub enum Error {
region_id: RegionId,
location: Location,
},
#[snafu(display("Invalid options, source: {}", source))]
JsonOptions {
source: serde_json::Error,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -522,6 +528,7 @@ impl ErrorExt for Error {
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
JsonOptions { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -36,7 +36,7 @@ pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
mod region;
pub mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;

View File

@@ -15,6 +15,7 @@
//! Mito region.
pub(crate) mod opener;
pub mod options;
pub(crate) mod version;
use std::collections::HashMap;

View File

@@ -14,6 +14,7 @@
//! Region opener.
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;
@@ -32,6 +33,7 @@ use crate::config::MitoConfig;
use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::memtable::MemtableBuilderRef;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::MitoRegion;
use crate::region_write_ctx::RegionWriteCtx;
@@ -48,6 +50,7 @@ pub(crate) struct RegionOpener {
object_store: ObjectStore,
region_dir: String,
scheduler: SchedulerRef,
options: HashMap<String, String>,
}
impl RegionOpener {
@@ -65,6 +68,7 @@ impl RegionOpener {
object_store,
region_dir: String::new(),
scheduler,
options: HashMap::new(),
}
}
@@ -80,6 +84,12 @@ impl RegionOpener {
self
}
/// Sets options for the region.
pub(crate) fn options(mut self, value: HashMap<String, String>) -> Self {
self.options = value;
self
}
/// Writes region manifest and creates a new region.
///
/// # Panics
@@ -100,7 +110,10 @@ impl RegionOpener {
let mutable = self.memtable_builder.build(&metadata);
let version = VersionBuilder::new(metadata, mutable).build();
let options = RegionOptions::try_from(&self.options)?;
let version = VersionBuilder::new(metadata, mutable)
.options(options)
.build();
let version_control = Arc::new(VersionControl::new(version));
let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone()));
@@ -152,11 +165,13 @@ impl RegionOpener {
let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone()));
let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone()));
let mutable = self.memtable_builder.build(&metadata);
let options = RegionOptions::try_from(&self.options)?;
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.options(options)
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));

View File

@@ -0,0 +1,237 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Options for a region.
use std::collections::HashMap;
use std::time::Duration;
use serde::Deserialize;
use serde_json::Value;
use serde_with::{serde_as, with_prefix, DisplayFromStr};
use snafu::ResultExt;
use crate::error::{Error, JsonOptionsSnafu, Result};
/// Options that affect the entire region.
///
/// Users need to specify the options while creating/opening a region.
#[derive(Debug, Default, Clone, PartialEq, Eq, Deserialize)]
#[serde(default)]
pub struct RegionOptions {
/// Region SST files TTL.
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
/// Compaction options.
pub compaction: CompactionOptions,
}
impl TryFrom<&HashMap<String, String>> for RegionOptions {
type Error = Error;
fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
let value = options_map_to_value(options_map);
let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
// #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
// each field manually instead of using #[serde(flatten)] for `compaction`.
// See https://github.com/serde-rs/serde/issues/1626
let options: RegionOptionsWithoutEnum =
serde_json::from_str(&json).context(JsonOptionsSnafu)?;
let compaction: CompactionOptions = serde_json::from_str(&json).unwrap_or_default();
Ok(RegionOptions {
ttl: options.ttl,
compaction,
})
}
}
/// Options for compactions
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(tag = "compaction.type")]
#[serde(rename_all = "lowercase")]
pub enum CompactionOptions {
/// Time window compaction strategy.
#[serde(with = "prefix_twcs")]
Twcs(TwcsOptions),
}
impl Default for CompactionOptions {
fn default() -> Self {
Self::Twcs(TwcsOptions::default())
}
}
/// Time window compaction options.
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(default)]
pub struct TwcsOptions {
/// Max num of files that can be kept in active writing time window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_files: usize,
/// Max num of files that can be kept in inactive time window.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_files: usize,
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
}
with_prefix!(prefix_twcs "compaction.twcs.");
impl TwcsOptions {
/// Returns time window in second resolution.
pub fn time_window_seconds(&self) -> Option<i64> {
self.time_window.and_then(|window| {
let window_secs = window.as_secs();
if window_secs == 0 {
None
} else {
window_secs.try_into().ok()
}
})
}
}
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_files: 4,
max_inactive_window_files: 1,
time_window: None,
}
}
}
/// We need to define a new struct without enum fields as `#[serde(default)]` does not
/// support external tagging.
#[derive(Debug, Deserialize)]
#[serde(default)]
struct RegionOptionsWithoutEnum {
/// Region SST files TTL.
#[serde(with = "humantime_serde")]
ttl: Option<Duration>,
}
impl Default for RegionOptionsWithoutEnum {
fn default() -> Self {
let options = RegionOptions::default();
RegionOptionsWithoutEnum { ttl: options.ttl }
}
}
/// Converts the `options` map to a json object.
///
/// Converts all key-values to lowercase and replaces "null" strings by `null` json values.
fn options_map_to_value(options: &HashMap<String, String>) -> Value {
let map = options
.iter()
.map(|(key, value)| {
let (key, value) = (key.to_lowercase(), value.to_lowercase());
if value == "null" {
(key, Value::Null)
} else {
(key, Value::from(value))
}
})
.collect();
Value::Object(map)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
options
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
#[test]
fn test_empty_region_options() {
let map = make_map(&[]);
let options = RegionOptions::try_from(&map).unwrap();
assert_eq!(RegionOptions::default(), options);
}
#[test]
fn test_with_ttl() {
let map = make_map(&[("ttl", "7d")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
..Default::default()
};
assert_eq!(expect, options);
}
#[test]
fn test_without_compaction_type() {
// If `compaction.type` is not provided, we ignore all compaction
// related options. Actually serde does not support deserialize
// an enum without knowning its type.
let map = make_map(&[
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.time_window", "2h"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions::default();
assert_eq!(expect, options);
}
#[test]
fn test_with_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_files: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
..Default::default()
}),
..Default::default()
};
assert_eq!(expect, options);
}
#[test]
fn test_with_all() {
let map = make_map(&[
("ttl", "7d"),
("compaction.twcs.max_active_window_files", "8"),
("compaction.twcs.max_inactive_window_files", "2"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_files: 8,
max_inactive_window_files: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
}),
};
assert_eq!(expect, options);
}
}

View File

@@ -31,6 +31,7 @@ use store_api::storage::SequenceNumber;
use crate::manifest::action::RegionEdit;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::region::options::RegionOptions;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::{SstVersion, SstVersionRef};
@@ -204,7 +205,8 @@ pub(crate) struct Version {
///
/// Used to check if it is a flush task during the truncating table.
pub(crate) truncated_entry_id: Option<EntryId>,
// TODO(yingwen): RegionOptions.
/// Options of the region.
pub(crate) options: RegionOptions,
}
pub(crate) type VersionRef = Arc<Version>;
@@ -217,6 +219,7 @@ pub(crate) struct VersionBuilder {
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncated_entry_id: Option<EntryId>,
options: RegionOptions,
}
impl VersionBuilder {
@@ -229,6 +232,7 @@ impl VersionBuilder {
flushed_entry_id: 0,
flushed_sequence: 0,
truncated_entry_id: None,
options: RegionOptions::default(),
}
}
@@ -241,6 +245,7 @@ impl VersionBuilder {
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncated_entry_id: version.truncated_entry_id,
options: version.options.clone(),
}
}
@@ -274,6 +279,12 @@ impl VersionBuilder {
self
}
/// Sets options.
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
self.options = options;
self
}
/// Apply edit to the builder.
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(entry_id) = edit.flushed_entry_id {
@@ -324,6 +335,7 @@ impl VersionBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
options: self.options,
}
}
}

View File

@@ -16,14 +16,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::helper::{
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
to_proto_value,
};
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_base::readable_size::ReadableSize;
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::tracing::log::info;
@@ -37,10 +35,9 @@ use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{
CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu,
InvalidRequestSnafu, Result,
@@ -50,29 +47,6 @@ use crate::sst::file::FileMeta;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::wal::EntryId;
/// Options that affect the entire region.
///
/// Users need to specify the options while creating/opening a region.
#[derive(Debug)]
pub struct RegionOptions {
/// Region memtable max size in bytes.
pub write_buffer_size: Option<ReadableSize>,
/// Region SST files TTL.
pub ttl: Option<Duration>,
/// Compaction strategy.
pub compaction_strategy: CompactionStrategy,
}
impl Default for RegionOptions {
fn default() -> Self {
RegionOptions {
write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE),
ttl: None,
compaction_strategy: CompactionStrategy::default(),
}
}
}
/// Request to write a region.
#[derive(Debug)]
pub struct WriteRequest {

View File

@@ -209,6 +209,7 @@ pub struct CreateRequestBuilder {
tag_num: usize,
field_num: usize,
create_if_not_exists: bool,
options: HashMap<String, String>,
}
impl Default for CreateRequestBuilder {
@@ -218,6 +219,7 @@ impl Default for CreateRequestBuilder {
tag_num: 1,
field_num: 1,
create_if_not_exists: false,
options: HashMap::new(),
}
}
}
@@ -247,6 +249,11 @@ impl CreateRequestBuilder {
self
}
pub fn insert_option(mut self, key: &str, value: &str) -> Self {
self.options.insert(key.to_string(), value.to_string());
self
}
pub fn build(&self) -> RegionCreateRequest {
let mut column_id = 0;
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
@@ -292,7 +299,7 @@ impl CreateRequestBuilder {
column_metadatas,
primary_key,
create_if_not_exists: self.create_if_not_exists,
options: HashMap::default(),
options: self.options.clone(),
region_dir: self.region_dir.clone(),
}
}

View File

@@ -62,6 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
)
.metadata(metadata)
.region_dir(&request.region_dir)
.options(request.options)
.create(&self.config)
.await?;

View File

@@ -62,6 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.scheduler.clone(),
)
.region_dir(&request.region_dir)
.options(request.options)
.open(&self.config, &self.wal)
.await?;

View File

@@ -446,7 +446,6 @@ fn create_table_info(
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: table_options,
created_on: DateTime::default(),
partition_key_indices,

View File

@@ -228,7 +228,6 @@ mod tests {
.value_indices(vec![2, 3])
.engine("mito".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.region_numbers(regions)
@@ -297,7 +296,6 @@ WITH(
.primary_key_indices(vec![])
.engine("file".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(options)
.created_on(Default::default())
.build()

View File

@@ -107,9 +107,6 @@ pub struct TableMeta {
#[builder(default, setter(into))]
pub region_numbers: Vec<u32>,
pub next_column_id: ColumnId,
/// Options for table engine.
#[builder(default)]
pub engine_options: HashMap<String, String>,
/// Table options.
#[builder(default)]
pub options: TableOptions,
@@ -229,7 +226,6 @@ impl TableMeta {
let mut builder = TableMetaBuilder::default();
let _ = builder
.engine(&self.engine)
.engine_options(self.engine_options.clone())
.options(self.options.clone())
.created_on(self.created_on)
.region_numbers(self.region_numbers.clone())
@@ -531,7 +527,6 @@ pub struct RawTableMeta {
pub engine: String,
pub next_column_id: ColumnId,
pub region_numbers: Vec<u32>,
pub engine_options: HashMap<String, String>,
pub options: TableOptions,
pub created_on: DateTime<Utc>,
#[serde(default)]
@@ -547,7 +542,6 @@ impl From<TableMeta> for RawTableMeta {
engine: meta.engine,
next_column_id: meta.next_column_id,
region_numbers: meta.region_numbers,
engine_options: meta.engine_options,
options: meta.options,
created_on: meta.created_on,
partition_key_indices: meta.partition_key_indices,
@@ -566,7 +560,6 @@ impl TryFrom<RawTableMeta> for TableMeta {
engine: raw.engine,
region_numbers: raw.region_numbers,
next_column_id: raw.next_column_id,
engine_options: raw.engine_options,
options: raw.options,
created_on: raw.created_on,
partition_key_indices: raw.partition_key_indices,

View File

@@ -73,7 +73,6 @@ impl MemTable {
.value_indices(vec![])
.engine("mito".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.region_numbers(regions)

View File

@@ -29,7 +29,6 @@ pub fn test_table_info(
.value_indices(vec![])
.engine("mito".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.region_numbers(vec![1])