refactor: support to speficy ttl in open_compaction_region() (#6515)

refactor: add `ttl` in `open_compaction_region()` and `CompactionJob`

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-07-11 17:00:05 +08:00
committed by GitHub
parent 93e3a04aa8
commit 104d607b3f
6 changed files with 25 additions and 12 deletions

1
Cargo.lock generated
View File

@@ -7429,6 +7429,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"dotenv",
"either",
"futures",
"humantime-serde",
"index",

View File

@@ -42,6 +42,7 @@ datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
dotenv.workspace = true
either.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true

View File

@@ -368,6 +368,7 @@ impl CompactionScheduler {
picker_output: picker_output.clone(),
start_time,
waiters,
ttl,
};
let result = remote_job_scheduler

View File

@@ -20,6 +20,7 @@ use api::v1::region::compact_request;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use common_time::TimeToLive;
use either::Either;
use itertools::Itertools;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
@@ -116,7 +117,7 @@ pub async fn open_compaction_region(
req: &OpenCompactionRegionRequest,
mito_config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
) -> Result<CompactionRegion> {
let object_store = {
let name = &req.region_options.storage;
@@ -197,16 +198,22 @@ pub async fn open_compaction_region(
}
};
let ttl = find_ttl(
req.region_id.table_id(),
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
TimeToLive::default()
});
let ttl = match ttl_provider {
// Use the specified ttl.
Either::Left(ttl) => ttl,
// Get the ttl from the schema metadata manager.
Either::Right(schema_metadata_manager) => find_ttl(
req.region_id.table_id(),
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
TimeToLive::default()
}),
};
Ok(CompactionRegion {
region_id: req.region_id,
region_options: req.region_options.clone(),

View File

@@ -19,6 +19,7 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use either::Either;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
@@ -474,7 +475,7 @@ async fn test_open_compaction_region() {
&req,
&mito_config,
object_store_manager.clone(),
schema_metadata_manager,
Either::Right(schema_metadata_manager),
)
.await
.unwrap();

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Instant;
use common_telemetry::error;
use common_time::TimeToLive;
use serde::{Deserialize, Serialize};
use snafu::{Location, ResultExt, Snafu};
use store_api::storage::RegionId;
@@ -108,6 +109,7 @@ pub struct CompactionJob {
pub compaction_region: CompactionRegion,
pub picker_output: PickerOutput,
pub start_time: Instant,
pub ttl: TimeToLive,
/// Send the result of the compaction job to these waiters.
pub waiters: Vec<OutputTx>,
}