diff --git a/Cargo.lock b/Cargo.lock index 1511654c7e..960f510cba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7429,6 +7429,7 @@ dependencies = [ "datafusion-expr", "datatypes", "dotenv", + "either", "futures", "humantime-serde", "index", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index bf80824dcf..81aa7f56ed 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -42,6 +42,7 @@ datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true dotenv.workspace = true +either.workspace = true futures.workspace = true humantime-serde.workspace = true index.workspace = true diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index ca50f6c6eb..d7437da43c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -368,6 +368,7 @@ impl CompactionScheduler { picker_output: picker_output.clone(), start_time, waiters, + ttl, }; let result = remote_job_scheduler diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 27aecd42f8..b8ed064cea 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -20,6 +20,7 @@ use api::v1::region::compact_request; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{info, warn}; use common_time::TimeToLive; +use either::Either; use itertools::Itertools; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; @@ -116,7 +117,7 @@ pub async fn open_compaction_region( req: &OpenCompactionRegionRequest, mito_config: &MitoConfig, object_store_manager: ObjectStoreManagerRef, - schema_metadata_manager: SchemaMetadataManagerRef, + ttl_provider: Either, ) -> Result { let object_store = { let name = &req.region_options.storage; @@ -197,16 +198,22 @@ pub async fn open_compaction_region( } }; - let ttl = find_ttl( - req.region_id.table_id(), - current_version.options.ttl, - &schema_metadata_manager, - ) - .await - .unwrap_or_else(|e| { - warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id); - TimeToLive::default() - }); + let ttl = match ttl_provider { + // Use the specified ttl. + Either::Left(ttl) => ttl, + // Get the ttl from the schema metadata manager. + Either::Right(schema_metadata_manager) => find_ttl( + req.region_id.table_id(), + current_version.options.ttl, + &schema_metadata_manager, + ) + .await + .unwrap_or_else(|e| { + warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id); + TimeToLive::default() + }), + }; + Ok(CompactionRegion { region_id: req.region_id, region_options: req.region_options.clone(), diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index f28b6c7177..7236fa41df 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -19,6 +19,7 @@ use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use either::Either; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, @@ -474,7 +475,7 @@ async fn test_open_compaction_region() { &req, &mito_config, object_store_manager.clone(), - schema_metadata_manager, + Either::Right(schema_metadata_manager), ) .await .unwrap(); diff --git a/src/mito2/src/schedule/remote_job_scheduler.rs b/src/mito2/src/schedule/remote_job_scheduler.rs index bfe31ef041..2de08b3ec0 100644 --- a/src/mito2/src/schedule/remote_job_scheduler.rs +++ b/src/mito2/src/schedule/remote_job_scheduler.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Instant; use common_telemetry::error; +use common_time::TimeToLive; use serde::{Deserialize, Serialize}; use snafu::{Location, ResultExt, Snafu}; use store_api::storage::RegionId; @@ -108,6 +109,7 @@ pub struct CompactionJob { pub compaction_region: CompactionRegion, pub picker_output: PickerOutput, pub start_time: Instant, + pub ttl: TimeToLive, /// Send the result of the compaction job to these waiters. pub waiters: Vec, }