feat: overwrites inferred compaction window by region options (#5396)

* feat: use time window in compaction options for compaction window

* test: add tests for overwriting options

* chore: typo

* chore: fix a grammar issue in log
This commit is contained in:
Yingwen
2025-01-18 22:53:56 +08:00
committed by GitHub
parent d73815ba84
commit e0384a7d46
2 changed files with 244 additions and 3 deletions

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::AlterKind::SetRegionOptions;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
@@ -466,3 +470,219 @@ async fn test_compaction_update_time_window() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Put window 7200
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
// Check compaction window.
let region = engine.get_region(region_id).unwrap();
{
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Change compaction window.
let request = RegionRequest::Alter(RegionAlterRequest {
schema_version: region.metadata().schema_version,
kind: SetRegionOptions {
options: vec![SetRegionOption::Twsc(
"compaction.twcs.time_window".to_string(),
"2h".to_string(),
)],
},
});
engine.handle_request(region_id, request).await.unwrap();
// Compaction again. It should compacts window 3600 and 7200
// into 7200.
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
}
}
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Reopen region.
let options = HashMap::from([
("compaction.type".to_string(), "twcs".to_string()),
("compaction.twcs.time_window".to_string(), "2h".to_string()),
]);
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
}

View File

@@ -26,6 +26,7 @@
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_telemetry::info;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
@@ -253,7 +254,10 @@ pub(crate) struct Version {
///
/// Used to check if it is a flush task during the truncating table.
pub(crate) truncated_entry_id: Option<EntryId>,
/// Inferred compaction time window.
/// Inferred compaction time window from flush.
///
/// If compaction options contain a time window, it will overwrite this value
/// when creating a new version from the [VersionBuilder].
pub(crate) compaction_time_window: Option<Duration>,
/// Options of the region.
pub(crate) options: RegionOptions,
@@ -389,7 +393,24 @@ impl VersionBuilder {
}
/// Builds a new [Version] from the builder.
/// It overwrites the window size by compaction option.
pub(crate) fn build(self) -> Version {
let compaction_time_window = self
.options
.compaction
.time_window()
.or(self.compaction_time_window);
if self.compaction_time_window.is_some()
&& compaction_time_window != self.compaction_time_window
{
info!(
"VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
self.compaction_time_window,
compaction_time_window,
self.metadata.region_id
);
}
Version {
metadata: self.metadata,
memtables: self.memtables,
@@ -397,7 +418,7 @@ impl VersionBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
compaction_time_window,
options: self.options,
}
}