mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
11 Commits
v0.4.0-nig
...
self-hoste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bd53567b4 | ||
|
|
803940cfa4 | ||
|
|
420ae054b3 | ||
|
|
0f1e061f24 | ||
|
|
7961de25ad | ||
|
|
f7d98e533b | ||
|
|
b540d640cf | ||
|
|
51a4d660b7 | ||
|
|
1b2381502e | ||
|
|
0e937be3f5 | ||
|
|
564c183607 |
8
.github/workflows/release.yml
vendored
8
.github/workflows/release.yml
vendored
@@ -39,22 +39,22 @@ jobs:
|
||||
# The file format is greptime-<os>-<arch>
|
||||
include:
|
||||
- arch: aarch64-apple-darwin
|
||||
os: macos-latest
|
||||
os: self-hosted
|
||||
file: greptime-darwin-arm64
|
||||
continue-on-error: false
|
||||
opts: "-F servers/dashboard"
|
||||
- arch: x86_64-apple-darwin
|
||||
os: macos-latest
|
||||
os: self-hosted
|
||||
file: greptime-darwin-amd64
|
||||
continue-on-error: false
|
||||
opts: "-F servers/dashboard"
|
||||
- arch: aarch64-apple-darwin
|
||||
os: macos-latest
|
||||
os: self-hosted
|
||||
file: greptime-darwin-arm64-pyo3
|
||||
continue-on-error: false
|
||||
opts: "-F pyo3_backend,servers/dashboard"
|
||||
- arch: x86_64-apple-darwin
|
||||
os: macos-latest
|
||||
os: self-hosted
|
||||
file: greptime-darwin-amd64-pyo3
|
||||
continue-on-error: false
|
||||
opts: "-F pyo3_backend,servers/dashboard"
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::fmt::Debug;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use client::CachedMetaKvBackend;
|
||||
pub use client::{CachedMetaKvBackend, MetaKvBackend};
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider};
|
||||
|
||||
@@ -16,13 +16,16 @@ use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::{self, Result, UnsupportedInputDataTypeSnafu};
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
|
||||
use common_query::prelude::{Signature, Volatility};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::StringType;
|
||||
use datatypes::vectors::{Int64Vector, StringVector, Vector, VectorRef};
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::vectors::{
|
||||
Int64Vector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
|
||||
TimestampNanosecondVector, TimestampSecondVector, Vector, VectorRef,
|
||||
};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::scalars::function::{Function, FunctionContext};
|
||||
@@ -42,18 +45,33 @@ fn convert_to_seconds(arg: &str) -> Option<i64> {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_vector(vector: &dyn Vector) -> Vec<Option<i64>> {
|
||||
(0..vector.len())
|
||||
.map(|i| paste::expr!((vector.get(i)).as_timestamp().map(|ts| ts.value())))
|
||||
.collect::<Vec<Option<i64>>>()
|
||||
}
|
||||
|
||||
impl Function for ToUnixtimeFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::timestamp_second_datatype())
|
||||
Ok(ConcreteDataType::int64_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::exact(
|
||||
vec![ConcreteDataType::String(StringType)],
|
||||
Signature::uniform(
|
||||
1,
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
@@ -61,7 +79,7 @@ impl Function for ToUnixtimeFunction {
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
error::InvalidFuncArgsSnafu {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
columns.len()
|
||||
@@ -79,6 +97,42 @@ impl Function for ToUnixtimeFunction {
|
||||
.collect::<Vec<_>>(),
|
||||
)))
|
||||
}
|
||||
ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => {
|
||||
let array = columns[0].to_arrow_array();
|
||||
Ok(Arc::new(Int64Vector::try_from_arrow_array(&array).unwrap()))
|
||||
}
|
||||
ConcreteDataType::Timestamp(ts) => {
|
||||
let array = columns[0].to_arrow_array();
|
||||
let value = match ts {
|
||||
TimestampType::Second(_) => {
|
||||
let vector = paste::expr!(TimestampSecondVector::try_from_arrow_array(
|
||||
array
|
||||
)
|
||||
.unwrap());
|
||||
process_vector(&vector)
|
||||
}
|
||||
TimestampType::Millisecond(_) => {
|
||||
let vector = paste::expr!(
|
||||
TimestampMillisecondVector::try_from_arrow_array(array).unwrap()
|
||||
);
|
||||
process_vector(&vector)
|
||||
}
|
||||
TimestampType::Microsecond(_) => {
|
||||
let vector = paste::expr!(
|
||||
TimestampMicrosecondVector::try_from_arrow_array(array).unwrap()
|
||||
);
|
||||
process_vector(&vector)
|
||||
}
|
||||
TimestampType::Nanosecond(_) => {
|
||||
let vector = paste::expr!(TimestampNanosecondVector::try_from_arrow_array(
|
||||
array
|
||||
)
|
||||
.unwrap());
|
||||
process_vector(&vector)
|
||||
}
|
||||
};
|
||||
Ok(Arc::new(Int64Vector::from(value)))
|
||||
}
|
||||
_ => UnsupportedInputDataTypeSnafu {
|
||||
function: NAME,
|
||||
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
|
||||
@@ -97,28 +151,37 @@ impl fmt::Display for ToUnixtimeFunction {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_query::prelude::TypeSignature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::StringType;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::timestamp::TimestampSecond;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::StringVector;
|
||||
use datatypes::vectors::{StringVector, TimestampSecondVector};
|
||||
|
||||
use super::{ToUnixtimeFunction, *};
|
||||
use crate::scalars::Function;
|
||||
|
||||
#[test]
|
||||
fn test_to_unixtime() {
|
||||
fn test_string_to_unixtime() {
|
||||
let f = ToUnixtimeFunction::default();
|
||||
assert_eq!("to_unixtime", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Exact(valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![ConcreteDataType::String(StringType)]
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
]
|
||||
));
|
||||
|
||||
let times = vec![
|
||||
@@ -145,4 +208,106 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_int_to_unixtime() {
|
||||
let f = ToUnixtimeFunction::default();
|
||||
assert_eq!("to_unixtime", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
]
|
||||
));
|
||||
|
||||
let times = vec![Some(3_i64), None, Some(5_i64), None];
|
||||
let results = vec![Some(3), None, Some(5), None];
|
||||
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
|
||||
let vector = f.eval(FunctionContext::default(), &args).unwrap();
|
||||
assert_eq!(4, vector.len());
|
||||
for (i, _t) in times.iter().enumerate() {
|
||||
let v = vector.get(i);
|
||||
if i == 1 || i == 3 {
|
||||
assert_eq!(Value::Null, v);
|
||||
continue;
|
||||
}
|
||||
match v {
|
||||
Value::Int64(ts) => {
|
||||
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_to_unixtime() {
|
||||
let f = ToUnixtimeFunction::default();
|
||||
assert_eq!("to_unixtime", f.name());
|
||||
assert_eq!(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
f.return_type(&[]).unwrap()
|
||||
);
|
||||
|
||||
assert!(matches!(f.signature(),
|
||||
Signature {
|
||||
type_signature: TypeSignature::Uniform(1, valid_types),
|
||||
volatility: Volatility::Immutable
|
||||
} if valid_types == vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
ConcreteDataType::timestamp_microsecond_datatype(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
]
|
||||
));
|
||||
|
||||
let times: Vec<Option<TimestampSecond>> = vec![
|
||||
Some(TimestampSecond::new(123)),
|
||||
None,
|
||||
Some(TimestampSecond::new(42)),
|
||||
None,
|
||||
];
|
||||
let results = vec![Some(123), None, Some(42), None];
|
||||
let ts_vector: TimestampSecondVector = build_vector_from_slice(×);
|
||||
let args: Vec<VectorRef> = vec![Arc::new(ts_vector)];
|
||||
let vector = f.eval(FunctionContext::default(), &args).unwrap();
|
||||
assert_eq!(4, vector.len());
|
||||
for (i, _t) in times.iter().enumerate() {
|
||||
let v = vector.get(i);
|
||||
if i == 1 || i == 3 {
|
||||
assert_eq!(Value::Null, v);
|
||||
continue;
|
||||
}
|
||||
match v {
|
||||
Value::Int64(ts) => {
|
||||
assert_eq!(ts, (*results.get(i).unwrap()).unwrap());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_vector_from_slice<T: ScalarVector>(items: &[Option<T::RefItem<'_>>]) -> T {
|
||||
let mut builder = T::Builder::with_capacity(items.len());
|
||||
for item in items {
|
||||
builder.push(*item);
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use meta_client::client::{HeartbeatSender, MetaClient};
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -199,6 +199,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
};
|
||||
if let Some(req) = req {
|
||||
debug!("Sending heartbeat request: {:?}", req);
|
||||
if let Err(e) = tx.send(req).await {
|
||||
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
|
||||
match Self::create_streams(
|
||||
|
||||
@@ -20,8 +20,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::tracing::trace;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::{debug, error, info};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -83,7 +82,7 @@ impl HeartbeatTask {
|
||||
loop {
|
||||
match resp_stream.message().await {
|
||||
Ok(Some(resp)) => {
|
||||
trace!("Received a heartbeat response: {:?}", resp);
|
||||
debug!("Receiving heartbeat response: {:?}", resp);
|
||||
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
|
||||
if let Err(e) = capture_self.handle_response(ctx) {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
@@ -92,7 +91,6 @@ impl HeartbeatTask {
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_secs(retry_interval))
|
||||
.await;
|
||||
@@ -148,7 +146,7 @@ impl HeartbeatTask {
|
||||
error!(e; "Failed to send heartbeat to metasrv");
|
||||
break;
|
||||
} else {
|
||||
trace!("Send a heartbeat request to metasrv, content: {:?}", req);
|
||||
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use api::v1::meta::{
|
||||
heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
|
||||
Peer, RequestHeader, ResponseHeader, Role,
|
||||
};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use futures::StreamExt;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -59,6 +59,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
|
||||
break;
|
||||
}
|
||||
};
|
||||
debug!("Receiving heartbeat request: {:?}", req);
|
||||
|
||||
if pusher_key.is_none() {
|
||||
let node_id = get_node_id(header);
|
||||
@@ -76,6 +77,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
|
||||
|
||||
is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader());
|
||||
|
||||
debug!("Sending heartbeat response: {:?}", res);
|
||||
tx.send(res).await.expect("working rx");
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -452,7 +452,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
debug!(
|
||||
@@ -532,7 +531,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.write_buffer_size
|
||||
.map(|s| s.0 as usize),
|
||||
ttl: table_info.meta.options.ttl,
|
||||
compaction_time_window: table_info.meta.options.compaction_time_window,
|
||||
};
|
||||
|
||||
// TODO(weny): Returns an error earlier if the target region does not exist in the meta.
|
||||
|
||||
@@ -228,18 +228,15 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
let table_options = &self.data.request.table_options;
|
||||
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
|
||||
let ttl = table_options.ttl;
|
||||
let compaction_time_window = table_options.compaction_time_window;
|
||||
let open_opts = OpenOptions {
|
||||
parent_dir: table_dir.to_string(),
|
||||
write_buffer_size,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
};
|
||||
let create_opts = CreateOptions {
|
||||
parent_dir: table_dir.to_string(),
|
||||
write_buffer_size,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
};
|
||||
|
||||
let primary_key_indices = &self.data.request.primary_key_indices;
|
||||
@@ -285,7 +282,6 @@ impl<S: StorageEngine> TableCreator<S> {
|
||||
.name(region_name.clone())
|
||||
.row_key(row_key.clone())
|
||||
.default_cf(default_cf.clone())
|
||||
.compaction_time_window(compaction_time_window)
|
||||
.build()
|
||||
.context(BuildRegionDescriptorSnafu {
|
||||
table_name: &self.data.request.table_name,
|
||||
|
||||
@@ -27,6 +27,10 @@ use datafusion::catalog::catalog::MemoryCatalogList;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
|
||||
use datafusion::execution::runtime_env::RuntimeEnv;
|
||||
use datafusion::physical_optimizer::dist_enforcement::EnforceDistribution;
|
||||
use datafusion::physical_optimizer::repartition::Repartition;
|
||||
use datafusion::physical_optimizer::sort_enforcement::EnforceSorting;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
|
||||
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
|
||||
use datafusion_expr::LogicalPlan as DfLogicalPlan;
|
||||
@@ -79,6 +83,22 @@ impl QueryEngineState {
|
||||
let mut optimizer = Optimizer::new();
|
||||
optimizer.rules.push(Arc::new(OrderHintRule));
|
||||
|
||||
let mut physical_optimizers = {
|
||||
let state = SessionState::with_config_rt(session_config.clone(), runtime_env.clone());
|
||||
state.physical_optimizers().to_vec()
|
||||
};
|
||||
// run the repartition and sort enforcement rules first.
|
||||
// And `EnforceSorting` is required to run after `EnforceDistribution`.
|
||||
Self::remove_physical_optimize_rule(&mut physical_optimizers, EnforceSorting {}.name());
|
||||
Self::remove_physical_optimize_rule(
|
||||
&mut physical_optimizers,
|
||||
EnforceDistribution {}.name(),
|
||||
);
|
||||
Self::remove_physical_optimize_rule(&mut physical_optimizers, Repartition {}.name());
|
||||
physical_optimizers.insert(0, Arc::new(EnforceSorting {}));
|
||||
physical_optimizers.insert(0, Arc::new(EnforceDistribution {}));
|
||||
physical_optimizers.insert(0, Arc::new(Repartition {}));
|
||||
|
||||
let session_state = SessionState::with_config_rt_and_catalog_list(
|
||||
session_config,
|
||||
runtime_env,
|
||||
@@ -90,7 +110,8 @@ impl QueryEngineState {
|
||||
partition_manager,
|
||||
datanode_clients,
|
||||
)))
|
||||
.with_optimizer_rules(optimizer.rules);
|
||||
.with_optimizer_rules(optimizer.rules)
|
||||
.with_physical_optimizer_rules(physical_optimizers);
|
||||
|
||||
let df_context = SessionContext::with_state(session_state);
|
||||
|
||||
@@ -102,6 +123,22 @@ impl QueryEngineState {
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_physical_optimize_rule(
|
||||
rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
|
||||
name: &str,
|
||||
) {
|
||||
let mut index_to_move = None;
|
||||
for (i, rule) in rules.iter().enumerate() {
|
||||
if rule.name() == name {
|
||||
index_to_move = Some(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if let Some(index) = index_to_move {
|
||||
rules.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a udf function
|
||||
// TODO(dennis): manage UDFs by ourself.
|
||||
pub fn register_udf(&self, udf: ScalarUdf) {
|
||||
|
||||
@@ -71,10 +71,6 @@ fn create_sql_options(table_meta: &TableMeta) -> Vec<SqlOption> {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(w) = table_opts.compaction_time_window {
|
||||
options.push(sql_option("compaction_time_window", number_value(w)));
|
||||
}
|
||||
|
||||
for (k, v) in table_opts
|
||||
.extra_options
|
||||
.iter()
|
||||
|
||||
@@ -62,7 +62,6 @@ impl RegionDescBuilder {
|
||||
row_key: self.key_builder.build().unwrap(),
|
||||
default_cf: self.default_cf_builder.build().unwrap(),
|
||||
extra_cfs: Vec::new(),
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -120,6 +120,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
|
||||
}
|
||||
|
||||
let ctx = &PickerContext::with(req.compaction_time_window);
|
||||
|
||||
for level_num in 0..levels.level_num() {
|
||||
let level = levels.level(level_num as u8);
|
||||
let (compaction_time_window, outputs) = self.strategy.pick(ctx, level);
|
||||
@@ -130,8 +131,8 @@ impl<S: LogStore> Picker for SimplePicker<S> {
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Found SST files to compact {:?} on level: {}",
|
||||
outputs, level_num
|
||||
"Found SST files to compact {:?} on level: {}, compaction window: {:?}",
|
||||
outputs, level_num, compaction_time_window,
|
||||
);
|
||||
return Ok(Some(CompactionTaskImpl {
|
||||
schema: req.schema(),
|
||||
|
||||
@@ -47,19 +47,24 @@ impl Strategy for SimpleTimeWindowStrategy {
|
||||
if files.is_empty() {
|
||||
return (None, vec![]);
|
||||
}
|
||||
let time_bucket = ctx
|
||||
.compaction_time_window()
|
||||
.unwrap_or_else(|| infer_time_bucket(&files));
|
||||
let buckets = calculate_time_buckets(time_bucket, &files);
|
||||
debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets);
|
||||
let time_window = ctx.compaction_time_window().unwrap_or_else(|| {
|
||||
let inferred = infer_time_bucket(&files);
|
||||
debug!(
|
||||
"Compaction window is not present, inferring from files: {:?}",
|
||||
inferred
|
||||
);
|
||||
inferred
|
||||
});
|
||||
let buckets = calculate_time_buckets(time_window, &files);
|
||||
debug!("File bucket:{}, file groups: {:?}", time_window, buckets);
|
||||
(
|
||||
Some(time_bucket),
|
||||
Some(time_window),
|
||||
buckets
|
||||
.into_iter()
|
||||
.map(|(bound, files)| CompactionOutput {
|
||||
output_level: 1,
|
||||
bucket_bound: bound,
|
||||
bucket: time_bucket,
|
||||
bucket: time_window,
|
||||
inputs: files,
|
||||
})
|
||||
.collect(),
|
||||
|
||||
@@ -102,7 +102,6 @@ impl<S: LogStore> CompactionTaskImpl<S> {
|
||||
}
|
||||
|
||||
/// Writes updated SST info into manifest.
|
||||
// TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083]
|
||||
async fn write_manifest_and_apply(
|
||||
&self,
|
||||
output: HashSet<FileMeta>,
|
||||
@@ -116,6 +115,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
|
||||
flushed_sequence: None,
|
||||
files_to_add: Vec::from_iter(output.into_iter()),
|
||||
files_to_remove: Vec::from_iter(input.into_iter()),
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
};
|
||||
debug!(
|
||||
"Compacted region: {}, region edit: {:?}",
|
||||
@@ -151,7 +151,10 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
|
||||
|
||||
let input_ids = compacted.iter().map(|f| f.file_id).collect::<Vec<_>>();
|
||||
let output_ids = output.iter().map(|f| f.file_id).collect::<Vec<_>>();
|
||||
info!("Compacting SST files, input: {input_ids:?}, output: {output_ids:?}");
|
||||
info!(
|
||||
"Compacting SST files, input: {:?}, output: {:?}, window: {:?}",
|
||||
input_ids, output_ids, self.compaction_time_window
|
||||
);
|
||||
self.write_manifest_and_apply(output, compacted)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
||||
@@ -28,7 +28,7 @@ use store_api::storage::{
|
||||
};
|
||||
|
||||
use crate::compaction::CompactionSchedulerRef;
|
||||
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::file_purger::{FilePurgeHandler, FilePurgerRef};
|
||||
use crate::flush::{
|
||||
@@ -89,7 +89,7 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
|
||||
|
||||
async fn drop_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> {
|
||||
region.drop_region().await?;
|
||||
self.inner.remove_reigon(region.name());
|
||||
self.inner.remove_region(region.name());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -395,7 +395,6 @@ impl<S: LogStore> EngineInner<S> {
|
||||
name,
|
||||
&self.config,
|
||||
opts.ttl,
|
||||
opts.compaction_time_window,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -441,7 +440,6 @@ impl<S: LogStore> EngineInner<S> {
|
||||
®ion_name,
|
||||
&self.config,
|
||||
opts.ttl,
|
||||
opts.compaction_time_window,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -462,7 +460,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
self.regions.get_region(name)
|
||||
}
|
||||
|
||||
fn remove_reigon(&self, name: &str) {
|
||||
fn remove_region(&self, name: &str) {
|
||||
self.regions.remove(name)
|
||||
}
|
||||
|
||||
@@ -473,7 +471,6 @@ impl<S: LogStore> EngineInner<S> {
|
||||
region_name: &str,
|
||||
config: &EngineConfig,
|
||||
region_ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> Result<StoreConfig<S>> {
|
||||
let parent_dir = util::normalize_dir(parent_dir);
|
||||
|
||||
@@ -504,9 +501,8 @@ impl<S: LogStore> EngineInner<S> {
|
||||
engine_config: self.config.clone(),
|
||||
file_purger: self.file_purger.clone(),
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
write_buffer_size: write_buffer_size
|
||||
.unwrap_or(DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize),
|
||||
.unwrap_or(self.config.region_write_buffer_size.as_bytes() as usize),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -553,7 +549,7 @@ mod tests {
|
||||
log_file_dir: &TempDir,
|
||||
region_name: &str,
|
||||
region_id: u64,
|
||||
ctx: &EngineContext,
|
||||
config: EngineConfig,
|
||||
) -> (TestEngine, TestRegion) {
|
||||
let log_file_dir_path = log_file_dir.path().to_str().unwrap();
|
||||
let log_store = log_store_util::create_tmp_local_file_log_store(log_file_dir_path).await;
|
||||
@@ -564,8 +560,6 @@ mod tests {
|
||||
builder.root(&store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
let config = EngineConfig::default();
|
||||
|
||||
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
|
||||
|
||||
let engine = EngineImpl::new(
|
||||
@@ -584,7 +578,7 @@ mod tests {
|
||||
.build();
|
||||
|
||||
let region = engine
|
||||
.create_region(ctx, desc, &CreateOptions::default())
|
||||
.create_region(&EngineContext::default(), desc, &CreateOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -606,18 +600,38 @@ mod tests {
|
||||
|
||||
let region_name = "region-0";
|
||||
let region_id = 123456;
|
||||
let ctx = EngineContext::default();
|
||||
let config = EngineConfig::default();
|
||||
|
||||
let (engine, region) =
|
||||
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await;
|
||||
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, config).await;
|
||||
assert_eq!(region_name, region.name());
|
||||
|
||||
let ctx = EngineContext::default();
|
||||
let region2 = engine.get_region(&ctx, region_name).unwrap().unwrap();
|
||||
assert_eq!(region_name, region2.name());
|
||||
|
||||
assert!(engine.get_region(&ctx, "no such region").unwrap().is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_region_with_buffer_size() {
|
||||
let dir = create_temp_dir("test_buffer_size");
|
||||
let log_file_dir = create_temp_dir("test_buffer_wal");
|
||||
|
||||
let region_name = "region-0";
|
||||
let region_id = 123456;
|
||||
let mut config = EngineConfig::default();
|
||||
let expect_buffer_size = config.region_write_buffer_size / 2;
|
||||
config.region_write_buffer_size = expect_buffer_size;
|
||||
|
||||
let (_engine, region) =
|
||||
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, config).await;
|
||||
assert_eq!(
|
||||
expect_buffer_size.as_bytes() as usize,
|
||||
region.write_buffer_size().await
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_region() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -626,10 +640,10 @@ mod tests {
|
||||
|
||||
let region_name = "test_region";
|
||||
let region_id = 123456;
|
||||
let ctx = EngineContext::default();
|
||||
let config = EngineConfig::default();
|
||||
|
||||
let (engine, region) =
|
||||
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await;
|
||||
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, config).await;
|
||||
|
||||
assert_eq!(region_name, region.name());
|
||||
|
||||
@@ -648,6 +662,7 @@ mod tests {
|
||||
|
||||
// Flush memtable to sst.
|
||||
region.flush(&FlushContext::default()).await.unwrap();
|
||||
let ctx = EngineContext::default();
|
||||
engine
|
||||
.close_region(&ctx, region.name(), &CloseOptions::default())
|
||||
.await
|
||||
|
||||
@@ -312,6 +312,7 @@ impl<S: LogStore> FlushJob<S> {
|
||||
flushed_sequence: Some(self.flush_sequence),
|
||||
files_to_add: file_metas.to_vec(),
|
||||
files_to_remove: Vec::default(),
|
||||
compaction_time_window: None,
|
||||
};
|
||||
|
||||
self.writer
|
||||
|
||||
@@ -38,8 +38,6 @@ pub struct RawRegionMetadata {
|
||||
pub columns: RawColumnsMetadata,
|
||||
pub column_families: RawColumnFamiliesMetadata,
|
||||
pub version: VersionNumber,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
|
||||
@@ -78,6 +76,7 @@ pub struct RegionEdit {
|
||||
pub flushed_sequence: Option<SequenceNumber>,
|
||||
pub files_to_add: Vec<FileMeta>,
|
||||
pub files_to_remove: Vec<FileMeta>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// The region version checkpoint
|
||||
@@ -382,6 +381,7 @@ mod tests {
|
||||
flushed_sequence: Some(99),
|
||||
files_to_add: files.clone(),
|
||||
files_to_remove: vec![],
|
||||
compaction_time_window: None,
|
||||
},
|
||||
);
|
||||
builder.apply_edit(
|
||||
@@ -391,6 +391,7 @@ mod tests {
|
||||
flushed_sequence: Some(100),
|
||||
files_to_add: vec![],
|
||||
files_to_remove: vec![files[0].clone()],
|
||||
compaction_time_window: None,
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -71,5 +71,6 @@ pub fn build_region_edit(
|
||||
file_size: DEFAULT_TEST_FILE_SIZE,
|
||||
})
|
||||
.collect(),
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,8 +194,6 @@ pub struct RegionMetadata {
|
||||
pub columns: ColumnsMetadataRef,
|
||||
column_families: ColumnFamiliesMetadata,
|
||||
version: VersionNumber,
|
||||
/// Time window for compaction
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl RegionMetadata {
|
||||
@@ -214,11 +212,6 @@ impl RegionMetadata {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn compaction_time_window(&self) -> Option<i64> {
|
||||
self.compaction_time_window
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn user_schema(&self) -> &SchemaRef {
|
||||
self.schema.user_schema()
|
||||
@@ -320,8 +313,7 @@ impl RegionMetadata {
|
||||
let mut builder = RegionDescriptorBuilder::default()
|
||||
.id(self.id)
|
||||
.name(&self.name)
|
||||
.row_key(row_key)
|
||||
.compaction_time_window(self.compaction_time_window);
|
||||
.row_key(row_key);
|
||||
|
||||
for (cf_id, cf) in &self.column_families.id_to_cfs {
|
||||
let mut cf_builder = ColumnFamilyDescriptorBuilder::default()
|
||||
@@ -354,7 +346,6 @@ impl From<&RegionMetadata> for RawRegionMetadata {
|
||||
columns: RawColumnsMetadata::from(&*data.columns),
|
||||
column_families: RawColumnFamiliesMetadata::from(&data.column_families),
|
||||
version: data.version,
|
||||
compaction_time_window: data.compaction_time_window,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -373,7 +364,6 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
|
||||
columns,
|
||||
column_families: raw.column_families.into(),
|
||||
version: raw.version,
|
||||
compaction_time_window: raw.compaction_time_window,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -631,7 +621,6 @@ impl TryFrom<RegionDescriptor> for RegionMetadataBuilder {
|
||||
.name(desc.name)
|
||||
.id(desc.id)
|
||||
.row_key(desc.row_key)?
|
||||
.compaction_time_window(desc.compaction_time_window)
|
||||
.add_column_family(desc.default_cf)?;
|
||||
for cf in desc.extra_cfs {
|
||||
builder = builder.add_column_family(cf)?;
|
||||
@@ -778,7 +767,6 @@ struct RegionMetadataBuilder {
|
||||
columns_meta_builder: ColumnsMetadataBuilder,
|
||||
cfs_meta_builder: ColumnFamiliesMetadataBuilder,
|
||||
version: VersionNumber,
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl Default for RegionMetadataBuilder {
|
||||
@@ -795,7 +783,6 @@ impl RegionMetadataBuilder {
|
||||
columns_meta_builder: ColumnsMetadataBuilder::default(),
|
||||
cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(),
|
||||
version: Schema::INITIAL_VERSION,
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -820,11 +807,6 @@ impl RegionMetadataBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn compaction_time_window(mut self, compaction_time_window: Option<i64>) -> Self {
|
||||
self.compaction_time_window = compaction_time_window;
|
||||
self
|
||||
}
|
||||
|
||||
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
|
||||
let column_index_start = self.columns_meta_builder.columns.len();
|
||||
let column_index_end = column_index_start + cf.columns.len();
|
||||
@@ -855,7 +837,6 @@ impl RegionMetadataBuilder {
|
||||
columns,
|
||||
column_families: self.cfs_meta_builder.build(),
|
||||
version: self.version,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1047,7 +1028,6 @@ mod tests {
|
||||
.unwrap();
|
||||
RegionMetadataBuilder::new()
|
||||
.name(TEST_REGION)
|
||||
.compaction_time_window(None)
|
||||
.row_key(row_key)
|
||||
.unwrap()
|
||||
.add_column_family(cf)
|
||||
|
||||
@@ -160,11 +160,10 @@ pub struct StoreConfig<S: LogStore> {
|
||||
pub engine_config: Arc<EngineConfig>,
|
||||
pub file_purger: FilePurgerRef,
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
pub write_buffer_size: usize,
|
||||
}
|
||||
|
||||
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
|
||||
pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
|
||||
pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -244,7 +243,6 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
store_config.memtable_builder,
|
||||
store_config.engine_config.clone(),
|
||||
store_config.ttl,
|
||||
store_config.compaction_time_window,
|
||||
store_config.write_buffer_size,
|
||||
)),
|
||||
wal,
|
||||
@@ -264,7 +262,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
pub async fn open(
|
||||
name: String,
|
||||
store_config: StoreConfig<S>,
|
||||
opts: &OpenOptions,
|
||||
_opts: &OpenOptions,
|
||||
) -> Result<Option<RegionImpl<S>>> {
|
||||
// Load version meta data from manifest.
|
||||
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
|
||||
@@ -328,14 +326,11 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
version_control,
|
||||
last_flush_millis: AtomicI64::new(0),
|
||||
});
|
||||
let compaction_time_window = store_config
|
||||
.compaction_time_window
|
||||
.or(opts.compaction_time_window);
|
||||
|
||||
let writer = Arc::new(RegionWriter::new(
|
||||
store_config.memtable_builder,
|
||||
store_config.engine_config.clone(),
|
||||
store_config.ttl,
|
||||
compaction_time_window,
|
||||
store_config.write_buffer_size,
|
||||
));
|
||||
let writer_ctx = WriterContext {
|
||||
@@ -521,6 +516,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
flushed_sequence: e.flushed_sequence,
|
||||
manifest_version,
|
||||
max_memtable_id: None,
|
||||
compaction_time_window: e.compaction_time_window,
|
||||
};
|
||||
version.map(|mut v| {
|
||||
v.apply_edit(edit);
|
||||
@@ -575,6 +571,10 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
|
||||
inner.writer.replay(recovered_metadata, writer_ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_buffer_size(&self) -> usize {
|
||||
self.inner.writer.write_buffer_size().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared data of region.
|
||||
|
||||
@@ -306,7 +306,8 @@ async fn test_new_region() {
|
||||
let dir = create_temp_dir("test_new_region");
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
|
||||
let store_config = config_util::new_store_config(region_name, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(region_name, store_dir, EngineConfig::default()).await;
|
||||
let placeholder_memtable = store_config
|
||||
.memtable_builder
|
||||
.build(metadata.schema().clone());
|
||||
@@ -555,7 +556,6 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig<NoopL
|
||||
engine_config: Default::default(),
|
||||
file_purger,
|
||||
ttl: None,
|
||||
compaction_time_window: None,
|
||||
write_buffer_size: ReadableSize::mb(32).0 as usize,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ use store_api::storage::{
|
||||
SchemaRef, Snapshot, WriteRequest, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::{OpenOptions, RawRegionMetadata, RegionImpl, RegionMetadata};
|
||||
use crate::test_util;
|
||||
@@ -38,7 +39,8 @@ async fn create_region_for_alter(store_dir: &str) -> RegionImpl<RaftEngineLogSto
|
||||
// Always disable version column in this test.
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
@@ -112,7 +114,9 @@ impl AlterTester {
|
||||
}
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, &self.store_dir, EngineConfig::default())
|
||||
.await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
.await
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_test_util::temp_dir::create_temp_dir;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use store_api::storage::{OpenOptions, SequenceNumber, WriteResponse};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::Result;
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::RegionImpl;
|
||||
@@ -32,7 +33,8 @@ async fn create_region_for_basic(
|
||||
store_dir: &str,
|
||||
) -> RegionImpl<RaftEngineLogStore> {
|
||||
let metadata = tests::new_metadata(region_name);
|
||||
let store_config = config_util::new_store_config(region_name, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(region_name, store_dir, EngineConfig::default()).await;
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
|
||||
@@ -75,7 +77,12 @@ impl Tester {
|
||||
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(&self.region_name, &self.store_dir).await;
|
||||
let store_config = config_util::new_store_config(
|
||||
&self.region_name,
|
||||
&self.store_dir,
|
||||
EngineConfig::default(),
|
||||
)
|
||||
.await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(self.region_name.clone(), store_config, &opts).await?;
|
||||
match region {
|
||||
|
||||
@@ -22,6 +22,7 @@ use store_api::storage::{
|
||||
AlterOperation, AlterRequest, CloseContext, Region, RegionMeta, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine;
|
||||
use crate::error::Error;
|
||||
use crate::flush::FlushStrategyRef;
|
||||
@@ -44,7 +45,8 @@ async fn create_region_for_close(
|
||||
) -> RegionImpl<RaftEngineLogStore> {
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let mut store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
store_config.flush_strategy = flush_strategy;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
|
||||
@@ -17,13 +17,14 @@
|
||||
use std::env;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::logging;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use object_store::services::{Fs, S3};
|
||||
use object_store::ObjectStore;
|
||||
use store_api::storage::{FlushContext, FlushReason, Region, WriteResponse};
|
||||
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::compaction::{CompactionHandler, SimplePicker};
|
||||
@@ -83,6 +84,7 @@ async fn create_region_for_compaction<
|
||||
REGION_NAME,
|
||||
store_dir,
|
||||
object_store.clone(),
|
||||
EngineConfig::default(),
|
||||
)
|
||||
.await;
|
||||
store_config.engine_config = Arc::new(engine_config);
|
||||
@@ -150,6 +152,9 @@ struct CompactionTester {
|
||||
base: Option<FileTesterBase>,
|
||||
purge_handler: MockFilePurgeHandler,
|
||||
object_store: ObjectStore,
|
||||
store_dir: String,
|
||||
engine_config: EngineConfig,
|
||||
flush_strategy: FlushStrategyRef,
|
||||
}
|
||||
|
||||
impl CompactionTester {
|
||||
@@ -164,7 +169,7 @@ impl CompactionTester {
|
||||
store_dir,
|
||||
engine_config.clone(),
|
||||
purge_handler.clone(),
|
||||
flush_strategy,
|
||||
flush_strategy.clone(),
|
||||
s3_bucket,
|
||||
)
|
||||
.await;
|
||||
@@ -173,6 +178,9 @@ impl CompactionTester {
|
||||
base: Some(FileTesterBase::with_region(region)),
|
||||
purge_handler,
|
||||
object_store,
|
||||
store_dir: store_dir.to_string(),
|
||||
engine_config,
|
||||
flush_strategy,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,6 +228,48 @@ impl CompactionTester {
|
||||
|
||||
self.object_store.remove_all("/").await.unwrap();
|
||||
}
|
||||
|
||||
async fn reopen(&mut self) -> Result<bool> {
|
||||
// Close the old region.
|
||||
if let Some(base) = self.base.take() {
|
||||
base.close().await;
|
||||
}
|
||||
|
||||
// Reopen the region.
|
||||
let object_store = new_object_store(&self.store_dir, None);
|
||||
let (mut store_config, _) = config_util::new_store_config_with_object_store(
|
||||
REGION_NAME,
|
||||
&self.store_dir,
|
||||
object_store.clone(),
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
store_config.engine_config = Arc::new(self.engine_config.clone());
|
||||
store_config.flush_strategy = self.flush_strategy.clone();
|
||||
|
||||
let picker = SimplePicker::default();
|
||||
let handler = CompactionHandler::new(picker);
|
||||
let config = SchedulerConfig::default();
|
||||
// Overwrite test compaction scheduler and file purger.
|
||||
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
|
||||
store_config.file_purger = Arc::new(LocalScheduler::new(
|
||||
SchedulerConfig {
|
||||
max_inflight_tasks: store_config.engine_config.max_purge_tasks,
|
||||
},
|
||||
MockFilePurgeHandler::default(),
|
||||
));
|
||||
|
||||
// FIXME(hl): find out which component prevents logstore from being dropped.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let Some(region) = RegionImpl::open(REGION_NAME.to_string(), store_config, &OpenOptions::default()).await? else {
|
||||
return Ok(false);
|
||||
};
|
||||
self.base = Some(FileTesterBase::with_region(region));
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
async fn compact_during_read(s3_bucket: Option<String>) {
|
||||
@@ -289,3 +339,110 @@ async fn test_compact_during_read_on_s3() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persist_region_compaction_time_window() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let dir = create_temp_dir("put-delete-scan");
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut tester = CompactionTester::new(
|
||||
store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: 100,
|
||||
..Default::default()
|
||||
},
|
||||
// Disable auto-flush.
|
||||
Arc::new(FlushSwitch::default()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// initially the time window is not present since no compaction ever happened.
|
||||
assert_eq!(
|
||||
None,
|
||||
tester
|
||||
.base
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.region
|
||||
.inner
|
||||
.shared
|
||||
.version_control
|
||||
.current()
|
||||
.ssts()
|
||||
.compaction_time_window()
|
||||
);
|
||||
|
||||
// write some data with one hour span
|
||||
for idx in 0..10 {
|
||||
tester
|
||||
.put(&[(idx * 1000, Some(idx)), ((idx + 360) * 1000, Some(idx))])
|
||||
.await;
|
||||
tester.flush(Some(true)).await;
|
||||
}
|
||||
|
||||
tester.compact().await;
|
||||
// the inferred and persisted compaction time window should be 3600 seconds.
|
||||
assert_eq!(
|
||||
3600,
|
||||
tester
|
||||
.base
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.region
|
||||
.inner
|
||||
.shared
|
||||
.version_control
|
||||
.current()
|
||||
.ssts()
|
||||
.compaction_time_window()
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// try write data with a larger time window
|
||||
for idx in 0..10 {
|
||||
tester
|
||||
.put(&[
|
||||
(idx * 1000, Some(idx)),
|
||||
((idx + 2 * 60 * 60) * 1000, Some(idx)),
|
||||
])
|
||||
.await;
|
||||
tester.flush(Some(true)).await;
|
||||
}
|
||||
tester.compact().await;
|
||||
|
||||
// but we won't changed persisted compaction window for now, so it remains unchanged.
|
||||
assert_eq!(
|
||||
3600,
|
||||
tester
|
||||
.base
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.region
|
||||
.inner
|
||||
.shared
|
||||
.version_control
|
||||
.current()
|
||||
.ssts()
|
||||
.compaction_time_window()
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
let reopened = tester.reopen().await.unwrap();
|
||||
assert!(reopened);
|
||||
assert_eq!(
|
||||
3600,
|
||||
tester
|
||||
.base
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.region
|
||||
.inner
|
||||
.shared
|
||||
.version_control
|
||||
.current()
|
||||
.ssts()
|
||||
.compaction_time_window()
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ use store_api::storage::{
|
||||
FlushContext, FlushReason, OpenOptions, Region, ScanRequest, WriteResponse,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::{self, RegionMap};
|
||||
use crate::flush::{FlushStrategyRef, FlushType};
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
@@ -46,8 +47,15 @@ async fn create_region_for_flush(
|
||||
) {
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let (mut store_config, regions) =
|
||||
config_util::new_store_config_and_region_map(REGION_NAME, store_dir).await;
|
||||
let (mut store_config, regions) = config_util::new_store_config_and_region_map(
|
||||
REGION_NAME,
|
||||
store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
store_config.flush_strategy = flush_strategy;
|
||||
|
||||
(
|
||||
@@ -84,8 +92,18 @@ impl FlushTester {
|
||||
}
|
||||
self.base = None;
|
||||
// Reopen the region.
|
||||
let mut store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await;
|
||||
let mut store_config = config_util::new_store_config(
|
||||
REGION_NAME,
|
||||
&self.store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
store_config.flush_strategy = self.flush_strategy.clone();
|
||||
// FIXME(hl): find out which component prevents logstore from being dropped.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
.await
|
||||
|
||||
@@ -26,6 +26,7 @@ use store_api::storage::{
|
||||
Chunk, ChunkReader, ReadContext, Region, ScanRequest, Snapshot, WriteContext, WriteRequest,
|
||||
};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::region::{RegionImpl, RegionMetadata};
|
||||
use crate::test_util::{self, config_util, descriptor_util, write_batch_util};
|
||||
use crate::write_batch::WriteBatch;
|
||||
@@ -171,7 +172,8 @@ const REGION_NAME: &str = "region-projection-0";
|
||||
async fn new_tester(store_dir: &str) -> ProjectionTester<RaftEngineLogStore> {
|
||||
let metadata = new_metadata(REGION_NAME);
|
||||
|
||||
let store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
|
||||
let store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
let region = RegionImpl::create(metadata, store_config).await.unwrap();
|
||||
|
||||
ProjectionTester::with_region(region)
|
||||
|
||||
@@ -42,7 +42,7 @@ use crate::metadata::RegionMetadataRef;
|
||||
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED};
|
||||
use crate::proto::wal::WalHeader;
|
||||
use crate::region::{
|
||||
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
|
||||
CompactContext, RecoveredMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
|
||||
};
|
||||
use crate::schema::compat::CompatWrite;
|
||||
use crate::sst::AccessLayerRef;
|
||||
@@ -72,7 +72,6 @@ impl RegionWriter {
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
write_buffer_size: usize,
|
||||
) -> RegionWriter {
|
||||
RegionWriter {
|
||||
@@ -80,7 +79,6 @@ impl RegionWriter {
|
||||
memtable_builder,
|
||||
config,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
write_buffer_size,
|
||||
)),
|
||||
version_mutex: Mutex::new(()),
|
||||
@@ -141,7 +139,7 @@ impl RegionWriter {
|
||||
let files_to_add = edit.files_to_add.clone();
|
||||
let files_to_remove = edit.files_to_remove.clone();
|
||||
let flushed_sequence = edit.flushed_sequence;
|
||||
|
||||
let compaction_time_window = edit.compaction_time_window;
|
||||
// Persist the meta action.
|
||||
let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
|
||||
action_list.set_prev_version(prev_version);
|
||||
@@ -158,6 +156,7 @@ impl RegionWriter {
|
||||
flushed_sequence,
|
||||
manifest_version,
|
||||
max_memtable_id,
|
||||
compaction_time_window,
|
||||
};
|
||||
|
||||
// We could tolerate failure during persisting manifest version to the WAL, since it won't
|
||||
@@ -390,6 +389,14 @@ impl RegionWriter {
|
||||
}
|
||||
}
|
||||
|
||||
// Methods for tests.
|
||||
#[cfg(test)]
|
||||
impl RegionWriter {
|
||||
pub(crate) async fn write_buffer_size(&self) -> usize {
|
||||
self.inner.lock().await.write_buffer_size
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriterContext<'a, S: LogStore> {
|
||||
pub shared: &'a SharedDataRef,
|
||||
pub flush_strategy: &'a FlushStrategyRef,
|
||||
@@ -448,7 +455,6 @@ struct WriterInner {
|
||||
closed: bool,
|
||||
engine_config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
/// Size in bytes to freeze the mutable memtable.
|
||||
write_buffer_size: usize,
|
||||
}
|
||||
@@ -458,7 +464,6 @@ impl WriterInner {
|
||||
memtable_builder: MemtableBuilderRef,
|
||||
engine_config: Arc<EngineConfig>,
|
||||
ttl: Option<Duration>,
|
||||
compaction_time_window: Option<i64>,
|
||||
write_buffer_size: usize,
|
||||
) -> WriterInner {
|
||||
WriterInner {
|
||||
@@ -467,7 +472,6 @@ impl WriterInner {
|
||||
engine_config,
|
||||
closed: false,
|
||||
ttl,
|
||||
compaction_time_window,
|
||||
write_buffer_size,
|
||||
}
|
||||
}
|
||||
@@ -633,7 +637,7 @@ impl WriterInner {
|
||||
&self,
|
||||
writer_ctx: &WriterContext<'_, S>,
|
||||
sequence: SequenceNumber,
|
||||
mut metadata: Option<RecoverdMetadata>,
|
||||
mut metadata: Option<RecoveredMetadata>,
|
||||
version_control: &VersionControl,
|
||||
) -> Result<()> {
|
||||
// It's safe to unwrap here, it's checked outside.
|
||||
@@ -768,7 +772,7 @@ impl WriterInner {
|
||||
manifest: ctx.manifest.clone(),
|
||||
engine_config: self.engine_config.clone(),
|
||||
ttl: self.ttl,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
compaction_time_window: current_version.ssts().compaction_time_window(),
|
||||
};
|
||||
|
||||
let flush_handle = ctx
|
||||
@@ -790,6 +794,12 @@ impl WriterInner {
|
||||
sst_write_buffer_size: ReadableSize,
|
||||
) -> Result<()> {
|
||||
let region_id = writer_ctx.shared.id();
|
||||
let compaction_time_window = writer_ctx
|
||||
.shared
|
||||
.version_control
|
||||
.current()
|
||||
.ssts()
|
||||
.compaction_time_window();
|
||||
let mut compaction_request = CompactionRequestImpl {
|
||||
region_id,
|
||||
sst_layer: writer_ctx.sst_layer.clone(),
|
||||
@@ -798,7 +808,7 @@ impl WriterInner {
|
||||
manifest: writer_ctx.manifest.clone(),
|
||||
wal: writer_ctx.wal.clone(),
|
||||
ttl: self.ttl,
|
||||
compaction_time_window: self.compaction_time_window,
|
||||
compaction_time_window,
|
||||
sender: None,
|
||||
sst_write_buffer_size,
|
||||
};
|
||||
|
||||
@@ -65,12 +65,15 @@ pub struct LevelMetas {
|
||||
levels: LevelMetaVec,
|
||||
sst_layer: AccessLayerRef,
|
||||
file_purger: FilePurgerRef,
|
||||
/// Compaction time window in seconds
|
||||
compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LevelMetas {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LevelMetas")
|
||||
.field("levels", &self.levels)
|
||||
.field("compaction_time_window", &self.compaction_time_window)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -82,6 +85,7 @@ impl LevelMetas {
|
||||
levels: new_level_meta_vec(),
|
||||
sst_layer,
|
||||
file_purger,
|
||||
compaction_time_window: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +95,10 @@ impl LevelMetas {
|
||||
self.levels.len()
|
||||
}
|
||||
|
||||
pub fn compaction_time_window(&self) -> Option<i64> {
|
||||
self.compaction_time_window
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn level(&self, level: Level) -> &LevelMeta {
|
||||
&self.levels[level as usize]
|
||||
@@ -104,6 +112,7 @@ impl LevelMetas {
|
||||
&self,
|
||||
files_to_add: impl Iterator<Item = FileMeta>,
|
||||
files_to_remove: impl Iterator<Item = FileMeta>,
|
||||
compaction_time_window: Option<i64>,
|
||||
) -> LevelMetas {
|
||||
let mut merged = self.clone();
|
||||
for file in files_to_add {
|
||||
@@ -118,6 +127,11 @@ impl LevelMetas {
|
||||
removed_file.mark_deleted();
|
||||
}
|
||||
}
|
||||
// we only update region's compaction time window iff region's window is not set and VersionEdit's
|
||||
// compaction time window is present.
|
||||
if let Some(window) = compaction_time_window {
|
||||
merged.compaction_time_window.get_or_insert(window);
|
||||
}
|
||||
merged
|
||||
}
|
||||
|
||||
@@ -726,6 +740,7 @@ mod tests {
|
||||
]
|
||||
.into_iter(),
|
||||
vec![].into_iter(),
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
@@ -740,6 +755,7 @@ mod tests {
|
||||
]
|
||||
.into_iter(),
|
||||
vec![].into_iter(),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
HashSet::from([file_ids[0], file_ids[1]]),
|
||||
@@ -758,6 +774,7 @@ mod tests {
|
||||
create_file_meta(file_ids[2], 0),
|
||||
]
|
||||
.into_iter(),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
HashSet::from([file_ids[1]]),
|
||||
@@ -776,6 +793,7 @@ mod tests {
|
||||
create_file_meta(file_ids[3], 1),
|
||||
]
|
||||
.into_iter(),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
HashSet::from([file_ids[1]]),
|
||||
|
||||
@@ -22,7 +22,7 @@ use object_store::ObjectStore;
|
||||
use store_api::manifest::Manifest;
|
||||
|
||||
use crate::compaction::noop::NoopCompactionScheduler;
|
||||
use crate::config::DEFAULT_REGION_WRITE_BUFFER_SIZE;
|
||||
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
|
||||
use crate::engine::{self, RegionMap};
|
||||
use crate::file_purger::noop::NoopFilePurgeHandler;
|
||||
use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy};
|
||||
@@ -40,12 +40,13 @@ fn log_store_dir(store_dir: &str) -> String {
|
||||
pub async fn new_store_config(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
engine_config: EngineConfig,
|
||||
) -> StoreConfig<RaftEngineLogStore> {
|
||||
let mut builder = Fs::default();
|
||||
builder.root(store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store)
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store, engine_config)
|
||||
.await
|
||||
.0
|
||||
}
|
||||
@@ -54,6 +55,7 @@ pub async fn new_store_config(
|
||||
pub async fn new_store_config_and_region_map(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
engine_config: EngineConfig,
|
||||
) -> (
|
||||
StoreConfig<RaftEngineLogStore>,
|
||||
Arc<RegionMap<RaftEngineLogStore>>,
|
||||
@@ -62,7 +64,7 @@ pub async fn new_store_config_and_region_map(
|
||||
builder.root(store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store).await
|
||||
new_store_config_with_object_store(region_name, store_dir, object_store, engine_config).await
|
||||
}
|
||||
|
||||
/// Create a new StoreConfig with given object store.
|
||||
@@ -70,6 +72,7 @@ pub async fn new_store_config_with_object_store(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
object_store: ObjectStore,
|
||||
engine_config: EngineConfig,
|
||||
) -> (
|
||||
StoreConfig<RaftEngineLogStore>,
|
||||
Arc<RegionMap<RaftEngineLogStore>>,
|
||||
@@ -92,6 +95,7 @@ pub async fn new_store_config_with_object_store(
|
||||
..Default::default()
|
||||
};
|
||||
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
|
||||
|
||||
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
|
||||
// We use an empty region map so actually the background worker of the picker is disabled.
|
||||
let regions = Arc::new(RegionMap::new());
|
||||
@@ -117,10 +121,9 @@ pub async fn new_store_config_with_object_store(
|
||||
flush_scheduler,
|
||||
flush_strategy: Arc::new(SizeBasedStrategy::default()),
|
||||
compaction_scheduler,
|
||||
engine_config: Default::default(),
|
||||
engine_config: Arc::new(engine_config),
|
||||
file_purger,
|
||||
ttl: None,
|
||||
compaction_time_window: None,
|
||||
write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize,
|
||||
},
|
||||
regions,
|
||||
|
||||
@@ -89,7 +89,6 @@ impl RegionDescBuilder {
|
||||
row_key: self.key_builder.build().unwrap(),
|
||||
default_cf: self.default_cf_builder.build().unwrap(),
|
||||
extra_cfs: Vec::new(),
|
||||
compaction_time_window: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -134,6 +134,7 @@ pub struct VersionEdit {
|
||||
pub flushed_sequence: Option<SequenceNumber>,
|
||||
pub manifest_version: ManifestVersion,
|
||||
pub max_memtable_id: Option<MemtableId>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
pub type VersionControlRef = Arc<VersionControl>;
|
||||
@@ -235,7 +236,7 @@ impl Version {
|
||||
) {
|
||||
self.flushed_sequence = flushed_sequence.unwrap_or(self.flushed_sequence);
|
||||
self.manifest_version = manifest_version;
|
||||
let ssts = self.ssts.merge(files, std::iter::empty());
|
||||
let ssts = self.ssts.merge(files, std::iter::empty(), None);
|
||||
info!(
|
||||
"After applying checkpoint, region: {}, id: {}, flushed_sequence: {}, manifest_version: {}",
|
||||
self.metadata.name(),
|
||||
@@ -264,15 +265,17 @@ impl Version {
|
||||
}
|
||||
|
||||
let handles_to_add = edit.files_to_add.into_iter();
|
||||
let merged_ssts = self
|
||||
.ssts
|
||||
.merge(handles_to_add, edit.files_to_remove.into_iter());
|
||||
let merged_ssts = self.ssts.merge(
|
||||
handles_to_add,
|
||||
edit.files_to_remove.into_iter(),
|
||||
edit.compaction_time_window,
|
||||
);
|
||||
|
||||
debug!(
|
||||
"After applying edit, region: {}, id: {}, SST files: {:?}",
|
||||
self.metadata.name(),
|
||||
self.metadata.id(),
|
||||
merged_ssts
|
||||
merged_ssts,
|
||||
);
|
||||
self.ssts = Arc::new(merged_ssts);
|
||||
}
|
||||
|
||||
@@ -144,8 +144,6 @@ pub struct RegionDescriptor {
|
||||
/// Extra column families defined by user.
|
||||
#[builder(default, setter(each(name = "push_extra_column_family")))]
|
||||
pub extra_cfs: Vec<ColumnFamilyDescriptor>,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
impl RowKeyDescriptorBuilder {
|
||||
|
||||
@@ -92,7 +92,6 @@ pub struct CreateOptions {
|
||||
pub write_buffer_size: Option<usize>,
|
||||
/// Region SST files TTL
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// Options to open a region.
|
||||
@@ -104,7 +103,6 @@ pub struct OpenOptions {
|
||||
pub write_buffer_size: Option<usize>,
|
||||
/// Region SST files TTL
|
||||
pub ttl: Option<Duration>,
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
/// Options to close a region.
|
||||
|
||||
@@ -170,7 +170,6 @@ mod tests {
|
||||
.name("test")
|
||||
.row_key(row_key)
|
||||
.default_cf(default_cf)
|
||||
.compaction_time_window(Some(1677652502))
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -77,14 +77,11 @@ pub struct TableOptions {
|
||||
pub ttl: Option<Duration>,
|
||||
/// Extra options that may not applicable to all table engines.
|
||||
pub extra_options: HashMap<String, String>,
|
||||
/// Time window for compaction
|
||||
pub compaction_time_window: Option<i64>,
|
||||
}
|
||||
|
||||
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
|
||||
pub const TTL_KEY: &str = "ttl";
|
||||
pub const REGIONS_KEY: &str = "regions";
|
||||
pub const COMPACTION_TIME_WINDOW_KEY: &str = "compaction_time_window";
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for TableOptions {
|
||||
type Error = error::Error;
|
||||
@@ -115,24 +112,8 @@ impl TryFrom<&HashMap<String, String>> for TableOptions {
|
||||
.into();
|
||||
options.ttl = Some(ttl_value);
|
||||
}
|
||||
if let Some(compaction_time_window) = value.get(COMPACTION_TIME_WINDOW_KEY) {
|
||||
options.compaction_time_window = match compaction_time_window.parse::<i64>() {
|
||||
Ok(t) => Some(t),
|
||||
Err(_) => {
|
||||
return ParseTableOptionSnafu {
|
||||
key: COMPACTION_TIME_WINDOW_KEY,
|
||||
value: compaction_time_window,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
};
|
||||
}
|
||||
options.extra_options = HashMap::from_iter(value.iter().filter_map(|(k, v)| {
|
||||
if k != WRITE_BUFFER_SIZE_KEY
|
||||
&& k != REGIONS_KEY
|
||||
&& k != TTL_KEY
|
||||
&& k != COMPACTION_TIME_WINDOW_KEY
|
||||
{
|
||||
if k != WRITE_BUFFER_SIZE_KEY && k != REGIONS_KEY && k != TTL_KEY {
|
||||
Some((k.clone(), v.clone()))
|
||||
} else {
|
||||
None
|
||||
@@ -155,12 +136,6 @@ impl From<&TableOptions> for HashMap<String, String> {
|
||||
let ttl_str = humantime::format_duration(ttl).to_string();
|
||||
res.insert(TTL_KEY.to_string(), ttl_str);
|
||||
}
|
||||
if let Some(compaction_time_window) = opts.compaction_time_window {
|
||||
res.insert(
|
||||
COMPACTION_TIME_WINDOW_KEY.to_string(),
|
||||
compaction_time_window.to_string(),
|
||||
);
|
||||
}
|
||||
res.extend(
|
||||
opts.extra_options
|
||||
.iter()
|
||||
@@ -328,7 +303,6 @@ mod tests {
|
||||
write_buffer_size: None,
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized = serde_json::to_string(&options).unwrap();
|
||||
let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
|
||||
@@ -341,7 +315,6 @@ mod tests {
|
||||
write_buffer_size: Some(ReadableSize::mb(128)),
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
@@ -351,7 +324,6 @@ mod tests {
|
||||
write_buffer_size: None,
|
||||
ttl: None,
|
||||
extra_options: HashMap::new(),
|
||||
compaction_time_window: None,
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
@@ -361,7 +333,6 @@ mod tests {
|
||||
write_buffer_size: Some(ReadableSize::mb(128)),
|
||||
ttl: Some(Duration::from_secs(1000)),
|
||||
extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
|
||||
compaction_time_window: Some(1677652502),
|
||||
};
|
||||
let serialized_map = HashMap::from(&options);
|
||||
let serialized = TableOptions::try_from(&serialized_map).unwrap();
|
||||
|
||||
@@ -21,7 +21,7 @@ mod sql;
|
||||
#[macro_use]
|
||||
mod region_failover;
|
||||
|
||||
grpc_tests!(File, S3, S3WithCache, Oss);
|
||||
http_tests!(File, S3, S3WithCache, Oss);
|
||||
region_failover_tests!(File, S3, S3WithCache, Oss);
|
||||
grpc_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
http_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
sql_tests!(File);
|
||||
|
||||
@@ -42,3 +42,64 @@ select TO_UNIXTIME('2023-03-01T06:35:02Z');
|
||||
| 1677652502 |
|
||||
+-------------------------------------------+
|
||||
|
||||
select TO_UNIXTIME(2);
|
||||
|
||||
+-----------------------+
|
||||
| to_unixtime(Int64(2)) |
|
||||
+-----------------------+
|
||||
| 2 |
|
||||
+-----------------------+
|
||||
|
||||
create table test_unixtime(a int, b timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE test_unixtime;
|
||||
|
||||
+-------+----------------------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+----------------------+------+---------+---------------+
|
||||
| a | Int32 | YES | | FIELD |
|
||||
| b | TimestampMillisecond | NO | | TIME INDEX |
|
||||
+-------+----------------------+------+---------+---------------+
|
||||
|
||||
insert into test_unixtime values(27, 27);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
select * from test_unixtime;
|
||||
|
||||
+----+-------------------------+
|
||||
| a | b |
|
||||
+----+-------------------------+
|
||||
| 27 | 1970-01-01T00:00:00.027 |
|
||||
+----+-------------------------+
|
||||
|
||||
select a from test_unixtime;
|
||||
|
||||
+----+
|
||||
| a |
|
||||
+----+
|
||||
| 27 |
|
||||
+----+
|
||||
|
||||
select b from test_unixtime;
|
||||
|
||||
+-------------------------+
|
||||
| b |
|
||||
+-------------------------+
|
||||
| 1970-01-01T00:00:00.027 |
|
||||
+-------------------------+
|
||||
|
||||
select TO_UNIXTIME(b) from test_unixtime;
|
||||
|
||||
+------------------------------+
|
||||
| to_unixtime(test_unixtime.b) |
|
||||
+------------------------------+
|
||||
| 27 |
|
||||
+------------------------------+
|
||||
|
||||
DROP TABLE test_unixtime;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
|
||||
@@ -11,3 +11,21 @@ select "A";
|
||||
select * where "a" = "A";
|
||||
|
||||
select TO_UNIXTIME('2023-03-01T06:35:02Z');
|
||||
|
||||
select TO_UNIXTIME(2);
|
||||
|
||||
create table test_unixtime(a int, b timestamp time index);
|
||||
|
||||
DESC TABLE test_unixtime;
|
||||
|
||||
insert into test_unixtime values(27, 27);
|
||||
|
||||
select * from test_unixtime;
|
||||
|
||||
select a from test_unixtime;
|
||||
|
||||
select b from test_unixtime;
|
||||
|
||||
select TO_UNIXTIME(b) from test_unixtime;
|
||||
|
||||
DROP TABLE test_unixtime;
|
||||
|
||||
Reference in New Issue
Block a user