feat: ttl=0/instant/forever/humantime&ttl refactor (#5089)

* feat: ttl zero filter

* refactor: use TimeToLive enum

* fix: unit test

* tests: sqlness

* refactor: Option<TTL> None means UNSET

* tests: sqlness

* fix: 10000 years --> forever

* chore: minor refactor from reviews

* chore: rename back TimeToLive

* refactor: split imme request from normal requests

* fix: use correct lifetime

* refactor: rename immediate to instant

* tests: flow sink table default ttl

* refactor: per review

* tests: sqlness

* fix: ttl alter to instant

* tests: sqlness

* refactor: per review

* chore: per review

* feat: add db ttl type&forbid instant for db

* tests: more unit test
This commit is contained in:
discord9
2024-12-06 17:20:42 +08:00
committed by GitHub
parent dc83b0aa15
commit 8b944268da
39 changed files with 1729 additions and 169 deletions

4
Cargo.lock generated
View File

@@ -2446,6 +2446,8 @@ dependencies = [
"chrono-tz 0.8.6",
"common-error",
"common-macro",
"humantime",
"humantime-serde",
"once_cell",
"rand",
"serde",
@@ -6592,6 +6594,7 @@ dependencies = [
"aquamarine",
"async-trait",
"base64 0.21.7",
"common-base",
"common-error",
"common-macro",
"common-query",
@@ -7656,6 +7659,7 @@ dependencies = [
name = "operator"
version = "0.11.0"
dependencies = [
"ahash 0.8.11",
"api",
"async-stream",
"async-trait",

View File

@@ -46,11 +46,7 @@ fn build_new_schema_value(
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
value.ttl = Some(*ttl);
}
}
}
@@ -230,12 +226,12 @@ mod tests {
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![

View File

@@ -15,9 +15,9 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::DatabaseTimeToLive;
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
@@ -57,15 +57,13 @@ impl Default for SchemaNameKey<'_> {
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<DatabaseTimeToLive>,
}
impl Display for SchemaNameValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ttl) = self.ttl {
let ttl = humantime::format_duration(ttl);
write!(f, "ttl='{ttl}'")?;
if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
write!(f, "ttl='{}'", ttl)?;
}
Ok(())
@@ -96,11 +94,8 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl {
opts.insert(
OPT_KEY_TTL.to_string(),
format!("{}", humantime::format_duration(ttl)),
);
if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
opts.insert(OPT_KEY_TTL.to_string(), ttl);
}
opts
}
@@ -313,6 +308,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -323,9 +319,14 @@ mod tests {
assert_eq!("", schema_value.to_string());
let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(9)),
ttl: Some(Duration::from_secs(9).into()),
};
assert_eq!("ttl='9s'", schema_value.to_string());
let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(0).into()),
};
assert_eq!("ttl='forever'", schema_value.to_string());
}
#[test]
@@ -338,17 +339,36 @@ mod tests {
assert_eq!(key, parsed);
let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);
let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(value), parsed);
let forever = SchemaNameValue {
ttl: Some(Default::default()),
};
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(forever), parsed);
let instant_err = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
);
assert!(instant_err.is_err());
let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
assert!(none.is_none());
let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
assert!(err_empty.is_err());
}
@@ -374,7 +394,7 @@ mod tests {
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
@@ -388,10 +408,10 @@ mod tests {
.unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
ttl: Some(Duration::from_secs(40).into()),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
ttl: Some(Duration::from_secs(20).into()),
}
.try_as_raw_value()
.unwrap();
@@ -402,5 +422,15 @@ mod tests {
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue { ttl: None };
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
assert_eq!(new_schema_value, *current_schema_value);
}
}

View File

@@ -14,7 +14,6 @@
use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
@@ -36,7 +35,7 @@ use api::v1::{
};
use base64::engine::general_purpose;
use base64::Engine as _;
use humantime_serde::re::humantime;
use common_time::DatabaseTimeToLive;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
@@ -1009,12 +1008,8 @@ impl TryFrom<PbOption> for SetDatabaseOption {
fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
};
let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
Ok(SetDatabaseOption::Ttl(ttl))
}
@@ -1025,7 +1020,7 @@ impl TryFrom<PbOption> for SetDatabaseOption {
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetDatabaseOption {
Ttl(Duration),
Ttl(DatabaseTimeToLive),
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]

View File

@@ -13,6 +13,8 @@ chrono.workspace = true
chrono-tz = "0.8"
common-error.workspace = true
common-macro.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
once_cell.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true

View File

@@ -93,12 +93,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse duration"))]
ParseDuration {
#[snafu(source)]
error: humantime::DurationError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Database's TTL can't be `instant`"))]
InvalidDatabaseTtl {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. }
| Error::ParseDuration { .. }
| Error::InvalidDatabaseTtl { .. }
| Error::ParseTimestamp { .. }
| Error::InvalidTimezoneOffset { .. }
| Error::Format { .. }

View File

@@ -22,6 +22,7 @@ pub mod time;
pub mod timestamp;
pub mod timestamp_millis;
pub mod timezone;
pub mod ttl;
pub mod util;
pub use date::Date;
@@ -32,3 +33,4 @@ pub use range::RangeMillis;
pub use timestamp::Timestamp;
pub use timestamp_millis::TimestampMillis;
pub use timezone::Timezone;
pub use ttl::{DatabaseTimeToLive, TimeToLive, FOREVER, INSTANT};

266
src/common/time/src/ttl.rs Normal file
View File

@@ -0,0 +1,266 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Error, InvalidDatabaseTtlSnafu, ParseDurationSnafu};
use crate::Timestamp;
pub const INSTANT: &str = "instant";
pub const FOREVER: &str = "forever";
/// Time To Live for database, which can be `Forever`, or a `Duration`, but can't be `Instant`.
///
/// unlike `TimeToLive` which can be `Instant`, `Forever`, or a `Duration`
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DatabaseTimeToLive {
/// Keep the data forever
#[default]
Forever,
/// Duration to keep the data, this duration should be non-zero
#[serde(untagged, with = "humantime_serde")]
Duration(Duration),
}
impl Display for DatabaseTimeToLive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseTimeToLive::Forever => write!(f, "{}", FOREVER),
DatabaseTimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)),
}
}
}
impl DatabaseTimeToLive {
/// Parse a string that is either `forever`, or a duration to `TimeToLive`
///
/// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too
pub fn from_humantime_or_str(s: &str) -> Result<Self, Error> {
let ttl = match s.to_lowercase().as_ref() {
INSTANT => InvalidDatabaseTtlSnafu.fail()?,
FOREVER | "" => Self::Forever,
_ => {
let d = humantime::parse_duration(s).context(ParseDurationSnafu)?;
Self::from(d)
}
};
Ok(ttl)
}
}
impl TryFrom<TimeToLive> for DatabaseTimeToLive {
type Error = Error;
fn try_from(value: TimeToLive) -> Result<Self, Self::Error> {
match value {
TimeToLive::Instant => InvalidDatabaseTtlSnafu.fail()?,
TimeToLive::Forever => Ok(Self::Forever),
TimeToLive::Duration(d) => Ok(Self::from(d)),
}
}
}
impl From<DatabaseTimeToLive> for TimeToLive {
fn from(value: DatabaseTimeToLive) -> Self {
match value {
DatabaseTimeToLive::Forever => TimeToLive::Forever,
DatabaseTimeToLive::Duration(d) => TimeToLive::from(d),
}
}
}
impl From<Duration> for DatabaseTimeToLive {
fn from(duration: Duration) -> Self {
if duration.is_zero() {
Self::Forever
} else {
Self::Duration(duration)
}
}
}
impl From<humantime::Duration> for DatabaseTimeToLive {
fn from(duration: humantime::Duration) -> Self {
Self::from(*duration)
}
}
/// Time To Live
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TimeToLive {
/// Instantly discard upon insert
Instant,
/// Keep the data forever
#[default]
Forever,
/// Duration to keep the data, this duration should be non-zero
#[serde(untagged, with = "humantime_serde")]
Duration(Duration),
}
impl Display for TimeToLive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimeToLive::Instant => write!(f, "{}", INSTANT),
TimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)),
TimeToLive::Forever => write!(f, "{}", FOREVER),
}
}
}
impl TimeToLive {
/// Parse a string that is either `instant`, `forever`, or a duration to `TimeToLive`
///
/// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too
pub fn from_humantime_or_str(s: &str) -> Result<Self, Error> {
match s.to_lowercase().as_ref() {
INSTANT => Ok(TimeToLive::Instant),
FOREVER | "" => Ok(TimeToLive::Forever),
_ => {
let d = humantime::parse_duration(s).context(ParseDurationSnafu)?;
Ok(TimeToLive::from(d))
}
}
}
/// Check if the TimeToLive is expired
/// with the given `created_at` and `now` timestamp
pub fn is_expired(
&self,
created_at: &Timestamp,
now: &Timestamp,
) -> crate::error::Result<bool> {
Ok(match self {
TimeToLive::Instant => true,
TimeToLive::Forever => false,
TimeToLive::Duration(d) => now.sub_duration(*d)? > *created_at,
})
}
/// is instant variant
pub fn is_instant(&self) -> bool {
matches!(self, TimeToLive::Instant)
}
/// Is the default value, which is `Forever`
pub fn is_forever(&self) -> bool {
matches!(self, TimeToLive::Forever)
}
}
impl From<Duration> for TimeToLive {
fn from(duration: Duration) -> Self {
if duration.is_zero() {
// compatibility with old code, and inline with cassandra's behavior when ttl set to 0
TimeToLive::Forever
} else {
TimeToLive::Duration(duration)
}
}
}
impl From<humantime::Duration> for TimeToLive {
fn from(duration: humantime::Duration) -> Self {
Self::from(*duration)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_db_ttl_table_ttl() {
// test from ttl to db ttl
let ttl = TimeToLive::from(Duration::from_secs(10));
let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap();
assert_eq!(db_ttl, DatabaseTimeToLive::from(Duration::from_secs(10)));
assert_eq!(TimeToLive::from(db_ttl), ttl);
let ttl = TimeToLive::from(Duration::from_secs(0));
let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap();
assert_eq!(db_ttl, DatabaseTimeToLive::Forever);
assert_eq!(TimeToLive::from(db_ttl), ttl);
let ttl = TimeToLive::Instant;
let err_instant = DatabaseTimeToLive::try_from(ttl);
assert!(err_instant.is_err());
// test 0 duration
let ttl = Duration::from_secs(0);
let db_ttl: DatabaseTimeToLive = ttl.into();
assert_eq!(db_ttl, DatabaseTimeToLive::Forever);
let ttl = Duration::from_secs(10);
let db_ttl: DatabaseTimeToLive = ttl.into();
assert_eq!(
db_ttl,
DatabaseTimeToLive::Duration(Duration::from_secs(10))
);
let ttl = DatabaseTimeToLive::from_humantime_or_str("10s").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::from(Duration::from_secs(10)));
let ttl = DatabaseTimeToLive::from_humantime_or_str("forever").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::Forever);
assert!(DatabaseTimeToLive::from_humantime_or_str("instant").is_err());
// test 0s
let ttl = DatabaseTimeToLive::from_humantime_or_str("0s").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::Forever);
}
#[test]
fn test_serde() {
let cases = vec![
("\"instant\"", TimeToLive::Instant),
("\"forever\"", TimeToLive::Forever),
("\"10d\"", Duration::from_secs(86400 * 10).into()),
(
"\"10000 years\"",
humantime::parse_duration("10000 years").unwrap().into(),
),
];
for (s, expected) in cases {
let serialized = serde_json::to_string(&expected).unwrap();
let deserialized: TimeToLive = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, expected);
let deserialized: TimeToLive = serde_json::from_str(s).unwrap_or_else(|err| {
panic!("Actual serialized: {}, s=`{s}`, err: {:?}", serialized, err)
});
assert_eq!(deserialized, expected);
// test db ttl too
if s == "\"instant\"" {
assert!(serde_json::from_str::<DatabaseTimeToLive>(s).is_err());
continue;
}
let db_ttl: DatabaseTimeToLive = serde_json::from_str(s).unwrap();
let re_serialized = serde_json::to_string(&db_ttl).unwrap();
assert_eq!(re_serialized, serialized);
}
}
}

View File

@@ -12,6 +12,7 @@ api.workspace = true
aquamarine.workspace = true
async-trait.workspace = true
base64.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true

View File

@@ -207,7 +207,7 @@ mod test {
let alter_region_option_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::TTL(Duration::from_secs(500))],
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},
};
let result = engine_inner

View File

@@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::{info, warn};
use common_time::Timestamp;
use common_time::{Timestamp, FOREVER};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
@@ -540,7 +540,7 @@ pub(crate) fn region_options_for_metadata_region(
mut original: HashMap<String, String>,
) -> HashMap<String, String> {
original.remove(APPEND_MODE_KEY);
original.insert(TTL_KEY.to_string(), "10000 years".to_string());
original.insert(TTL_KEY.to_string(), FOREVER.to_string());
original
}
@@ -731,7 +731,7 @@ mod test {
);
assert_eq!(
metadata_region_request.options.get("ttl").unwrap(),
"10000 years"
"forever"
);
}
}

View File

@@ -24,7 +24,7 @@ mod window;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use api::v1::region::compact_request;
use common_base::Plugins;
@@ -32,7 +32,7 @@ use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use serde::{Deserialize, Serialize};
@@ -273,7 +273,7 @@ impl CompactionScheduler {
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
None
TimeToLive::default()
});
debug!(
@@ -292,7 +292,7 @@ impl CompactionScheduler {
access_layer: access_layer.clone(),
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl,
ttl: Some(ttl),
};
let picker_output = {
@@ -437,18 +437,21 @@ impl PendingCompaction {
/// Finds TTL of table by first examine table options then database options.
async fn find_ttl(
table_id: TableId,
table_ttl: Option<Duration>,
table_ttl: Option<TimeToLive>,
schema_metadata_manager: &SchemaMetadataManagerRef,
) -> Result<Option<Duration>> {
) -> Result<TimeToLive> {
// If table TTL is set, we use it.
if let Some(table_ttl) = table_ttl {
return Ok(Some(table_ttl));
return Ok(table_ttl);
}
let ttl = schema_metadata_manager
.get_schema_options_by_table_id(table_id)
.await
.context(GetSchemaMetadataSnafu)?
.and_then(|options| options.ttl);
.and_then(|options| options.ttl)
.unwrap_or_default()
.into();
Ok(ttl)
}
@@ -656,24 +659,16 @@ fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
/// Finds all expired SSTs across levels.
fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<Duration>,
ttl: Option<TimeToLive>,
now: Timestamp,
) -> Vec<FileHandle> {
let Some(ttl) = ttl else {
return vec![];
};
let expire_time = match now.sub_duration(ttl) {
Ok(expire_time) => expire_time,
Err(e) => {
error!(e; "Failed to calculate region TTL expire time");
return vec![];
}
};
levels
.iter()
.flat_map(|l| l.get_expired_files(&expire_time).into_iter())
.flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
.collect()
}

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use api::v1::region::compact_request;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use common_time::TimeToLive;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
@@ -63,7 +64,7 @@ pub struct CompactionRegion {
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: VersionRef,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<Duration>,
pub(crate) ttl: Option<TimeToLive>,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -180,7 +181,7 @@ pub async fn open_compaction_region(
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
None
TimeToLive::default()
});
Ok(CompactionRegion {
region_id: req.region_id,
@@ -193,7 +194,7 @@ pub async fn open_compaction_region(
manifest_ctx,
current_version,
file_purger: Some(file_purger),
ttl,
ttl: Some(ttl),
})
}

View File

@@ -253,7 +253,7 @@ mod tests {
truncated_entry_id: None,
compaction_time_window: None,
options: RegionOptions {
ttl,
ttl: ttl.map(|t| t.into()),
compaction: Default::default(),
storage: None,
append_mode: false,

View File

@@ -604,7 +604,7 @@ async fn test_alter_region_ttl_options() {
let alter_ttl_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::TTL(Duration::from_secs(500))],
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},
};
let alter_job = tokio::spawn(async move {
@@ -617,14 +617,8 @@ async fn test_alter_region_ttl_options() {
alter_job.await.unwrap();
let check_ttl = |engine: &MitoEngine, expected: &Duration| {
let current_ttl = engine
.get_region(region_id)
.unwrap()
.version()
.options
.ttl
.unwrap();
assert_eq!(*expected, current_ttl);
let current_ttl = engine.get_region(region_id).unwrap().version().options.ttl;
assert_eq!(current_ttl, Some((*expected).into()));
};
// Verify the ttl.
check_ttl(&engine, &Duration::from_secs(500));

View File

@@ -165,8 +165,8 @@ async fn test_engine_create_with_options() {
assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 10),
region.version().options.ttl.unwrap()
region.version().options.ttl,
Some(Duration::from_secs(3600 * 24 * 10).into())
);
}

View File

@@ -180,8 +180,8 @@ async fn test_engine_region_open_with_options() {
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 4),
region.version().options.ttl.unwrap()
region.version().options.ttl,
Some(Duration::from_secs(3600 * 24 * 4).into())
);
}

View File

@@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_time::TimeToLive;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use serde::de::Error as _;
use serde::{Deserialize, Deserializer, Serialize};
@@ -55,8 +56,7 @@ pub enum MergeMode {
#[serde(default)]
pub struct RegionOptions {
/// Region SST files TTL.
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<TimeToLive>,
/// Compaction options.
pub compaction: CompactionOptions,
/// Custom storage. Uses default storage if it is `None`.
@@ -252,8 +252,7 @@ impl Default for TwcsOptions {
#[serde(default)]
struct RegionOptionsWithoutEnum {
/// Region SST files TTL.
#[serde(with = "humantime_serde")]
ttl: Option<Duration>,
ttl: Option<TimeToLive>,
storage: Option<String>,
#[serde_as(as = "DisplayFromStr")]
append_mode: bool,
@@ -458,7 +457,7 @@ mod tests {
let map = make_map(&[("ttl", "7d")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
..Default::default()
};
assert_eq!(expect, options);
@@ -621,7 +620,7 @@ mod tests {
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
@@ -654,7 +653,7 @@ mod tests {
#[test]
fn test_region_options_serde() {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: usize::MAX,
@@ -722,7 +721,7 @@ mod tests {
}"#;
let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use common_time::Timestamp;
use common_time::{TimeToLive, Timestamp};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL};
use crate::sst::file_purger::FilePurgerRef;
@@ -160,12 +160,19 @@ impl LevelMeta {
}
/// Returns expired SSTs from current level.
pub fn get_expired_files(&self, expire_time: &Timestamp) -> Vec<FileHandle> {
pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
self.files
.values()
.filter(|v| {
let (_, end) = v.time_range();
&end < expire_time
match ttl.is_expired(&end, now) {
Ok(expired) => expired,
Err(e) => {
common_telemetry::error!(e; "Failed to calculate region TTL expire time");
false
}
}
})
.cloned()
.collect()

View File

@@ -184,16 +184,12 @@ impl<S> RegionWorkerLoop<S> {
let mut current_options = version.options.clone();
for option in options {
match option {
SetRegionOption::TTL(new_ttl) => {
SetRegionOption::Ttl(new_ttl) => {
info!(
"Update region ttl: {}, previous: {:?} new: {:?}",
region.region_id, current_options.ttl, new_ttl
);
if new_ttl.is_zero() {
current_options.ttl = None;
} else {
current_options.ttl = Some(new_ttl);
}
current_options.ttl = new_ttl;
}
SetRegionOption::Twsc(key, value) => {
let Twcs(options) = &mut current_options.compaction;

View File

@@ -11,6 +11,7 @@ testing = []
workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
async-stream.workspace = true
async-trait = "0.1"

View File

@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use api::v1::alter_table_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
RegionRequestHeader,
};
use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
@@ -91,6 +94,20 @@ impl AutoCreateTableType {
}
}
/// Split insert requests into normal and instant requests.
///
/// Where instant requests are requests with ttl=instant,
/// and normal requests are requests with ttl set to other values.
///
/// This is used to split requests for different processing.
#[derive(Clone)]
pub struct InstantAndNormalInsertRequests {
/// Requests with normal ttl.
pub normal_requests: RegionInsertRequests,
/// Requests with ttl=instant.
pub instant_requests: RegionInsertRequests,
}
impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
@@ -183,12 +200,16 @@ impl Inserter {
});
validate_column_count_match(&requests)?;
let table_name_to_ids = self
let (table_name_to_ids, instant_table_ids) = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
instant_table_ids,
self.partition_manager.as_ref(),
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
@@ -215,7 +236,7 @@ impl Inserter {
.await?;
// check and create logical tables
let table_name_to_ids = self
let (table_name_to_ids, instant_table_ids) = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
@@ -223,9 +244,13 @@ impl Inserter {
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager)
.convert(requests)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
instant_table_ids,
&self.partition_manager,
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
@@ -268,7 +293,7 @@ impl Inserter {
impl Inserter {
async fn do_request(
&self,
requests: RegionInsertRequests,
requests: InstantAndNormalInsertRequests,
ctx: &QueryContextRef,
) -> Result<Output> {
let write_cost = write_meter!(
@@ -283,8 +308,21 @@ impl Inserter {
..Default::default()
});
let InstantAndNormalInsertRequests {
normal_requests,
instant_requests,
} = requests;
// Mirror requests for source table to flownode
match self.mirror_flow_node_requests(&requests).await {
match self
.mirror_flow_node_requests(
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await
{
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
@@ -320,7 +358,7 @@ impl Inserter {
}
let write_tasks = self
.group_requests_by_peer(requests)
.group_requests_by_peer(normal_requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
@@ -350,14 +388,14 @@ impl Inserter {
}
/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
&self,
requests: &RegionInsertRequests,
async fn mirror_flow_node_requests<'it, 'zelf: 'it>(
&'zelf self,
requests: impl Iterator<Item = &'it RegionInsertRequest>,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in &requests.requests {
for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
@@ -422,7 +460,6 @@ impl Inserter {
// group by region ids first to reduce repeatedly call `find_region_leader`
// TODO(discord9): determine if a addition clone is worth it
let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
for req in requests.requests {
let region_id = RegionId::from_u64(req.region_id);
requests_per_region
@@ -462,7 +499,7 @@ impl Inserter {
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<HashMap<String, TableId>> {
) -> Result<(HashMap<String, TableId>, HashSet<TableId>)> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
@@ -483,6 +520,7 @@ impl Inserter {
})?
.unwrap_or(true);
if !auto_create_table_hint {
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
let table = self
.get_table(catalog, &schema, &req.table_name)
@@ -494,17 +532,25 @@ impl Inserter {
),
})?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
return Ok(table_name_to_ids);
return Ok((table_name_to_ids, instant_table_ids));
}
let mut create_tables = vec![];
let mut alter_tables = vec![];
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
match self.get_table(catalog, &schema, &req.table_name).await? {
Some(table) => {
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
@@ -543,6 +589,8 @@ impl Inserter {
AutoCreateTableType::Physical
| AutoCreateTableType::Log
| AutoCreateTableType::LastNonNull => {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for create_table in create_tables {
let table = self
.create_physical_table(create_table, ctx, statement_executor)
@@ -558,7 +606,7 @@ impl Inserter {
}
}
Ok(table_name_to_ids)
Ok((table_name_to_ids, instant_table_ids))
}
async fn create_physical_table_on_demand(

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use ahash::{HashMap, HashSet};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::RowInsertRequests;
use partition::manager::PartitionRuleManager;
@@ -21,37 +20,53 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{Result, TableNotFoundSnafu};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
table_name_to_ids: HashMap<String, TableId>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
}
impl<'a> RowToRegion<'a> {
pub fn new(
table_name_to_ids: HashMap<String, TableId>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
) -> Self {
Self {
table_name_to_ids,
instant_table_ids,
partition_manager,
}
}
pub async fn convert(&self, requests: RowInsertRequests) -> Result<RegionInsertRequests> {
pub async fn convert(
&self,
requests: RowInsertRequests,
) -> Result<InstantAndNormalInsertRequests> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
region_request.extend(requests);
if self.instant_table_ids.contains(&table_id) {
instant_request.extend(requests);
} else {
region_request.extend(requests);
}
}
Ok(RegionInsertRequests {
requests: region_request,
Ok(InstantAndNormalInsertRequests {
normal_requests: RegionInsertRequests {
requests: region_request,
},
instant_requests: RegionInsertRequests {
requests: instant_request,
},
})
}

View File

@@ -32,6 +32,7 @@ use crate::error::{
ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
SchemaReadOnlySnafu, TableNotFoundSnafu,
};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;
@@ -60,7 +61,7 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<RegionInsertRequests> {
) -> Result<InstantAndNormalInsertRequests> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
@@ -134,7 +135,18 @@ impl<'a> StatementToRegion<'a> {
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_info.table_id(), Rows { schema, rows })
.await?;
Ok(RegionInsertRequests { requests })
let requests = RegionInsertRequests { requests };
if table_info.is_ttl_instant_table() {
Ok(InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
} else {
Ok(InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
}
}
async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {

View File

@@ -19,6 +19,7 @@ use table::metadata::TableInfo;
use table::requests::InsertRequest as TableInsertRequest;
use crate::error::Result;
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::common::{column_schema, row_count};
@@ -35,7 +36,10 @@ impl<'a> TableToRegion<'a> {
}
}
pub async fn convert(&self, request: TableInsertRequest) -> Result<RegionInsertRequests> {
pub async fn convert(
&self,
request: TableInsertRequest,
) -> Result<InstantAndNormalInsertRequests> {
let row_count = row_count(&request.columns_values)?;
let schema = column_schema(self.table_info, &request.columns_values)?;
let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
@@ -44,7 +48,19 @@ impl<'a> TableToRegion<'a> {
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(self.table_info.table_id(), rows)
.await?;
Ok(RegionInsertRequests { requests })
let requests = RegionInsertRequests { requests };
if self.table_info.is_ttl_instant_table() {
Ok(InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
} else {
Ok(InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
}
}
}
@@ -112,6 +128,7 @@ mod tests {
let region_requests = converter.convert(table_request).await.unwrap();
let mut region_id_to_region_requests = region_requests
.normal_requests
.requests
.into_iter()
.map(|r| (r.region_id, r))

View File

@@ -21,7 +21,6 @@ use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, SchemaRef, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COMMENT_KEY,
};
use humantime::format_duration;
use snafu::ResultExt;
use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName};
use sql::dialect::GreptimeDbDialect;
@@ -46,13 +45,13 @@ fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptio
write_buffer_size.to_string(),
);
}
if let Some(ttl) = table_opts.ttl {
options.insert(TTL_KEY.to_string(), format_duration(ttl).to_string());
} else if let Some(database_ttl) = schema_options.and_then(|o| o.ttl) {
options.insert(
TTL_KEY.to_string(),
format_duration(database_ttl).to_string(),
);
if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
options.insert(TTL_KEY.to_string(), ttl);
} else if let Some(database_ttl) = schema_options
.and_then(|o| o.ttl)
.map(|ttl| ttl.to_string())
{
options.insert(TTL_KEY.to_string(), database_ttl);
};
for (k, v) in table_opts
.extra_options

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
@@ -26,6 +25,7 @@ use api::v1::region::{
};
use api::v1::{self, Analyzer, Option as PbOption, Rows, SemanticType};
pub use common_base::AffectedRows;
use common_time::TimeToLive;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::FulltextOptions;
use serde::{Deserialize, Serialize};
@@ -746,7 +746,7 @@ impl From<v1::ModifyColumnType> for ModifyColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetRegionOption {
TTL(Duration),
Ttl(Option<TimeToLive>),
// Modifying TwscOptions with values as (option name, new value).
Twsc(String, String),
}
@@ -758,13 +758,10 @@ impl TryFrom<&PbOption> for SetRegionOption {
let PbOption { key, value } = value;
match key.as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
let ttl = TimeToLive::from_humantime_or_str(value)
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
Ok(Self::Ttl(Some(ttl)))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
@@ -798,7 +795,7 @@ impl From<&UnsetRegionOption> for SetRegionOption {
UnsetRegionOption::TwcsTimeWindow => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::Ttl => SetRegionOption::TTL(Duration::default()),
UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
}
}
}

View File

@@ -224,12 +224,8 @@ impl TableMeta {
for request in requests {
match request {
SetRegionOption::TTL(new_ttl) => {
if new_ttl.is_zero() {
new_options.ttl = None;
} else {
new_options.ttl = Some(*new_ttl);
}
SetRegionOption::Ttl(new_ttl) => {
new_options.ttl = *new_ttl;
}
SetRegionOption::Twsc(key, value) => {
if !value.is_empty() {
@@ -826,6 +822,15 @@ impl TableInfo {
.extra_options
.contains_key(PHYSICAL_TABLE_METADATA_KEY)
}
/// Return true if the table's TTL is `instant`.
pub fn is_ttl_instant_table(&self) -> bool {
self.meta
.options
.ttl
.map(|t| t.is_instant())
.unwrap_or(false)
}
}
impl TableInfoBuilder {

View File

@@ -17,12 +17,12 @@
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_datasource::object_store::s3::is_supported_in_s3;
use common_query::AddColumnLocation;
use common_time::range::TimestampRange;
use common_time::TimeToLive;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, FulltextOptions};
@@ -74,8 +74,7 @@ pub struct TableOptions {
/// Memtable size of memtable.
pub write_buffer_size: Option<ReadableSize>,
/// Time-to-live of table. Expired data will be automatically purged.
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<TimeToLive>,
/// Extra options that may not applicable to all table engines.
pub extra_options: HashMap<String, String>,
}
@@ -109,16 +108,13 @@ impl TableOptions {
}
if let Some(ttl) = kvs.get(TTL_KEY) {
let ttl_value = ttl
.parse::<humantime::Duration>()
.map_err(|_| {
ParseTableOptionSnafu {
key: TTL_KEY,
value: ttl,
}
.build()
})?
.into();
let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| {
ParseTableOptionSnafu {
key: TTL_KEY,
value: ttl,
}
.build()
})?;
options.ttl = Some(ttl_value);
}
@@ -138,8 +134,8 @@ impl fmt::Display for TableOptions {
key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
}
if let Some(ttl) = self.ttl {
key_vals.push(format!("{}={}", TTL_KEY, humantime::Duration::from(ttl)));
if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
key_vals.push(format!("{}={}", TTL_KEY, ttl));
}
for (k, v) in &self.extra_options {
@@ -159,8 +155,7 @@ impl From<&TableOptions> for HashMap<String, String> {
write_buffer_size.to_string(),
);
}
if let Some(ttl) = opts.ttl {
let ttl_str = humantime::format_duration(ttl).to_string();
if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
let _ = res.insert(TTL_KEY.to_string(), ttl_str);
}
res.extend(
@@ -326,6 +321,8 @@ pub struct CopyDatabaseRequest {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
@@ -343,7 +340,7 @@ mod tests {
fn test_serialize_table_options() {
let options = TableOptions {
write_buffer_size: None,
ttl: Some(Duration::from_secs(1000)),
ttl: Some(Duration::from_secs(1000).into()),
extra_options: HashMap::new(),
};
let serialized = serde_json::to_string(&options).unwrap();
@@ -355,7 +352,7 @@ mod tests {
fn test_convert_hashmap_between_table_options() {
let options = TableOptions {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
ttl: Some(Duration::from_secs(1000).into()),
extra_options: HashMap::new(),
};
let serialized_map = HashMap::from(&options);
@@ -364,7 +361,7 @@ mod tests {
let options = TableOptions {
write_buffer_size: None,
ttl: None,
ttl: Default::default(),
extra_options: HashMap::new(),
};
let serialized_map = HashMap::from(&options);
@@ -373,7 +370,7 @@ mod tests {
let options = TableOptions {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
ttl: Some(Duration::from_secs(1000).into()),
extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
};
let serialized_map = HashMap::from(&options);
@@ -385,7 +382,7 @@ mod tests {
fn test_table_options_to_string() {
let options = TableOptions {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
ttl: Some(Duration::from_secs(1000).into()),
extra_options: HashMap::new(),
};
@@ -396,7 +393,7 @@ mod tests {
let options = TableOptions {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
ttl: Some(Duration::from_secs(1000).into()),
extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
};

View File

@@ -62,6 +62,9 @@ SHOW CREATE DATABASE alter_database;
| Database | Create Database |
+----------------+----------------------------------------------+
| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------------+----------------------------------------------+
ALTER DATABASE alter_database SET 'ttl'='😁';

View File

@@ -103,7 +103,9 @@ SHOW CREATE TABLE ato;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+-------+------------------------------------+
ALTER TABLE ato SET 'ttl'='1s';

View File

@@ -0,0 +1,101 @@
-- test ttl = instant
CREATE TABLE distinct_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH ('ttl' = 'instant');
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
Affected Rows: 0
-- SQLNESS ARG restart=true
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 0
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE distinct_basic;
+----------------+-----------------------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------------------+
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
| | "number" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("number") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'instant' |
| | ) |
+----------------+-----------------------------------------------------------+
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
+-----+
SELECT number FROM distinct_basic;
++
++
DROP FLOW test_distinct_basic;
Affected Rows: 0
DROP TABLE distinct_basic;
Affected Rows: 0
DROP TABLE out_distinct_basic;
Affected Rows: 0

View File

@@ -0,0 +1,39 @@
-- test ttl = instant
CREATE TABLE distinct_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH ('ttl' = 'instant');
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
-- SQLNESS ARG restart=true
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
SHOW CREATE TABLE distinct_basic;
SHOW CREATE TABLE out_distinct_basic;
SELECT
dis
FROM
out_distinct_basic;
SELECT number FROM distinct_basic;
DROP FLOW test_distinct_basic;
DROP TABLE distinct_basic;
DROP TABLE out_distinct_basic;

View File

@@ -227,6 +227,23 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
SELECT
dis
FROM
@@ -478,6 +495,23 @@ ADMIN FLUSH_FLOW('calc_ngx_country');
| FLOW_FLUSHED |
+--------------------------------------+
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("ngx_access_log.country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
SELECT
"ngx_access_log.country"
FROM
@@ -594,6 +628,24 @@ ADMIN FLUSH_FLOW('calc_ngx_country');
| FLOW_FLUSHED |
+--------------------------------------+
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("ngx_access_log.country", "time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------------------+
SELECT
"ngx_access_log.country",
time_window

View File

@@ -128,6 +128,8 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
SHOW CREATE TABLE out_distinct_basic;
SELECT
dis
FROM
@@ -270,6 +272,8 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SHOW CREATE TABLE ngx_country;
SELECT
"ngx_access_log.country"
FROM
@@ -333,6 +337,8 @@ VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SHOW CREATE TABLE ngx_country;
SELECT
"ngx_access_log.country",
time_window

View File

@@ -0,0 +1,374 @@
CREATE DATABASE test_ttl_db WITH (ttl = '1 second');
Affected Rows: 1
USE test_ttl_db;
Affected Rows: 0
CREATE TABLE test_ttl(ts TIMESTAMP TIME INDEX, val INT);
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '1s' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = '1s' |
| | ) |
+-------------+-------------------------------------------+
ALTER DATABASE test_ttl_db SET ttl = '1 day';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '1day' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = '1day' |
| | ) |
+-------------+-------------------------------------------+
ALTER TABLE test_ttl SET 'ttl' = '6 hours';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '6h' |
| | ) |
+----------+-----------------------------------------+
ALTER TABLE test_ttl SET 'ttl' = 'instant';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'instant' |
| | ) |
+----------+-----------------------------------------+
ALTER TABLE test_ttl SET 'ttl' = '0s';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------+-----------------------------------------+
ALTER TABLE test_ttl SET 'ttl' = 'forever';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = '1day' |
| | ) |
+-------------+-------------------------------------------+
ALTER TABLE test_ttl UNSET 'ttl';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '1day' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = '1day' |
| | ) |
+-------------+-------------------------------------------+
ALTER DATABASE test_ttl_db SET 'ttl' = 'forever';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+-------------+-------------------------------------------+
ALTER DATABASE test_ttl_db SET 'ttl' = '0s';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+-------------+-------------------------------------------+
ALTER DATABASE test_ttl_db SET 'ttl' = 'instant';
Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
| | WITH( |
| | ttl = 'forever' |
| | ) |
+-------------+-------------------------------------------+
ALTER DATABASE test_ttl_db UNSET 'ttl';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
+-------------+-------------------------------------------+
ALTER TABLE test_ttl UNSET 'ttl';
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
SHOW CREATE DATABASE test_ttl_db;
+-------------+-------------------------------------------+
| Database | Create Database |
+-------------+-------------------------------------------+
| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db |
+-------------+-------------------------------------------+
DROP TABLE test_ttl;
Affected Rows: 0
USE public;
Affected Rows: 0
DROP DATABASE test_ttl_db;
Affected Rows: 0
-- test both set database to instant and alter ttl to instant for a database is forbidden
CREATE DATABASE test_ttl_db WITH (ttl = 'instant');
Error: 1002(Unexpected), Failed to parse value instant into key ttl
CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s');
Affected Rows: 1
ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant';
Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant

View File

@@ -0,0 +1,82 @@
CREATE DATABASE test_ttl_db WITH (ttl = '1 second');
USE test_ttl_db;
CREATE TABLE test_ttl(ts TIMESTAMP TIME INDEX, val INT);
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER DATABASE test_ttl_db SET ttl = '1 day';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER TABLE test_ttl SET 'ttl' = '6 hours';
SHOW CREATE TABLE test_ttl;
ALTER TABLE test_ttl SET 'ttl' = 'instant';
SHOW CREATE TABLE test_ttl;
ALTER TABLE test_ttl SET 'ttl' = '0s';
SHOW CREATE TABLE test_ttl;
ALTER TABLE test_ttl SET 'ttl' = 'forever';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER TABLE test_ttl UNSET 'ttl';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER DATABASE test_ttl_db SET 'ttl' = 'forever';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER DATABASE test_ttl_db SET 'ttl' = '0s';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER DATABASE test_ttl_db SET 'ttl' = 'instant';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER DATABASE test_ttl_db UNSET 'ttl';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
ALTER TABLE test_ttl UNSET 'ttl';
SHOW CREATE TABLE test_ttl;
SHOW CREATE DATABASE test_ttl_db;
DROP TABLE test_ttl;
USE public;
DROP DATABASE test_ttl_db;
-- test both set database to instant and alter ttl to instant for a database is forbidden
CREATE DATABASE test_ttl_db WITH (ttl = 'instant');
CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s');
ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant';

View File

@@ -0,0 +1,340 @@
CREATE TABLE test_ttl(
ts TIMESTAMP TIME INDEX,
val INT,
PRIMARY KEY (`val`)
) WITH (ttl = 'instant');
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("val") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = 'instant' |
| | ) |
+----------+-----------------------------------------+
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
Affected Rows: 0
SELECT
val
from
test_ttl
ORDER BY
val;
++
++
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
+-------------------------------+
| ADMIN flush_table('test_ttl') |
+-------------------------------+
| 0 |
+-------------------------------+
ADMIN compact_table('test_ttl');
+---------------------------------+
| ADMIN compact_table('test_ttl') |
+---------------------------------+
| 0 |
+---------------------------------+
SELECT
val
from
test_ttl
ORDER BY
val;
++
++
ALTER TABLE
test_ttl UNSET 'ttl';
Affected Rows: 0
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
Affected Rows: 3
SELECT
val
from
test_ttl
ORDER BY
val;
+-----+
| val |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
DROP TABLE test_ttl;
Affected Rows: 0
CREATE TABLE test_ttl(
ts TIMESTAMP TIME INDEX,
val INT,
PRIMARY KEY (`val`)
) WITH (ttl = '1s');
Affected Rows: 0
SHOW CREATE TABLE test_ttl;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" INT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("val") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | ttl = '1s' |
| | ) |
+----------+-----------------------------------------+
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
Affected Rows: 3
SELECT
val
from
test_ttl
ORDER BY
val;
+-----+
| val |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
ADMIN flush_table('test_ttl');
+-------------------------------+
| ADMIN flush_table('test_ttl') |
+-------------------------------+
| 0 |
+-------------------------------+
ADMIN compact_table('test_ttl');
+---------------------------------+
| ADMIN compact_table('test_ttl') |
+---------------------------------+
| 0 |
+---------------------------------+
SELECT
val
from
test_ttl
ORDER BY
val;
+-----+
| val |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
+-------------------------------+
| ADMIN flush_table('test_ttl') |
+-------------------------------+
| 0 |
+-------------------------------+
ADMIN compact_table('test_ttl');
+---------------------------------+
| ADMIN compact_table('test_ttl') |
+---------------------------------+
| 0 |
+---------------------------------+
SELECT
val
from
test_ttl
ORDER BY
val;
++
++
ALTER TABLE
test_ttl
SET
ttl = '1d';
Affected Rows: 0
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
Affected Rows: 3
SELECT
val
from
test_ttl
ORDER BY
val;
+-----+
| val |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
ALTER TABLE
test_ttl
SET
ttl = 'instant';
Affected Rows: 0
ADMIN flush_table('test_ttl');
+-------------------------------+
| ADMIN flush_table('test_ttl') |
+-------------------------------+
| 0 |
+-------------------------------+
ADMIN compact_table('test_ttl');
+---------------------------------+
| ADMIN compact_table('test_ttl') |
+---------------------------------+
| 0 |
+---------------------------------+
SELECT
val
from
test_ttl
ORDER BY
val;
++
++
-- to make sure alter back and forth from duration to/from instant wouldn't break anything
ALTER TABLE
test_ttl
SET
ttl = '1s';
Affected Rows: 0
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
Affected Rows: 3
SELECT
val
from
test_ttl
ORDER BY
val;
+-----+
| val |
+-----+
| 1 |
| 2 |
| 3 |
+-----+
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
+-------------------------------+
| ADMIN flush_table('test_ttl') |
+-------------------------------+
| 0 |
+-------------------------------+
ADMIN compact_table('test_ttl');
+---------------------------------+
| ADMIN compact_table('test_ttl') |
+---------------------------------+
| 0 |
+---------------------------------+
SELECT
val
from
test_ttl
ORDER BY
val;
++
++
DROP TABLE test_ttl;
Affected Rows: 0

View File

@@ -0,0 +1,166 @@
CREATE TABLE test_ttl(
ts TIMESTAMP TIME INDEX,
val INT,
PRIMARY KEY (`val`)
) WITH (ttl = 'instant');
SHOW CREATE TABLE test_ttl;
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
SELECT
val
from
test_ttl
ORDER BY
val;
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
ADMIN compact_table('test_ttl');
SELECT
val
from
test_ttl
ORDER BY
val;
ALTER TABLE
test_ttl UNSET 'ttl';
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
SELECT
val
from
test_ttl
ORDER BY
val;
DROP TABLE test_ttl;
CREATE TABLE test_ttl(
ts TIMESTAMP TIME INDEX,
val INT,
PRIMARY KEY (`val`)
) WITH (ttl = '1s');
SHOW CREATE TABLE test_ttl;
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
SELECT
val
from
test_ttl
ORDER BY
val;
ADMIN flush_table('test_ttl');
ADMIN compact_table('test_ttl');
SELECT
val
from
test_ttl
ORDER BY
val;
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
ADMIN compact_table('test_ttl');
SELECT
val
from
test_ttl
ORDER BY
val;
ALTER TABLE
test_ttl
SET
ttl = '1d';
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
SELECT
val
from
test_ttl
ORDER BY
val;
ALTER TABLE
test_ttl
SET
ttl = 'instant';
ADMIN flush_table('test_ttl');
ADMIN compact_table('test_ttl');
SELECT
val
from
test_ttl
ORDER BY
val;
-- to make sure alter back and forth from duration to/from instant wouldn't break anything
ALTER TABLE
test_ttl
SET
ttl = '1s';
INSERT INTO
test_ttl
VALUES
(now(), 1),
(now(), 2),
(now(), 3);
SELECT
val
from
test_ttl
ORDER BY
val;
-- SQLNESS SLEEP 2s
ADMIN flush_table('test_ttl');
ADMIN compact_table('test_ttl');
SELECT
val
from
test_ttl
ORDER BY
val;
DROP TABLE test_ttl;