diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 91816a4f9f..587e25298f 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -23,11 +23,13 @@ use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use datatypes::value::Value; use object_store::Buffer; use object_store::layers::mock::{ Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder, Result as MockResult, Write, Writer, }; +use partition::expr::{PartitionExpr, col}; use store_api::region_engine::{RegionEngine, SettableRegionRoleState}; use store_api::region_request::{ EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest, @@ -38,10 +40,16 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::listener::NotifyEnterStagingResultListener; use crate::error::Error; -use crate::region::{RegionLeaderState, RegionRoleState}; +use crate::region::{RegionLeaderState, RegionRoleState, parse_partition_expr}; use crate::request::WorkerRequest; use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; +fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Int64(start)) + .and(col(col_name).lt(Value::Int64(end))) +} + #[tokio::test] async fn test_staging_state_integration() { test_staging_state_integration_with_format(false).await; @@ -227,7 +235,9 @@ async fn test_staging_state_validation_patterns() { ); } -const PARTITION_EXPR: &str = "partition_expr"; +fn default_partition_expr() -> String { + range_expr("a", 0, 100).as_json_str().unwrap() +} #[tokio::test] async fn test_staging_manifest_directory() { @@ -237,6 +247,7 @@ async fn test_staging_manifest_directory() { async fn test_staging_manifest_directory_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); + let partition_expr = default_partition_expr(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { @@ -274,14 +285,14 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: PARTITION_EXPR.to_string(), + partition_expr: partition_expr.clone(), }), ) .await .unwrap(); let region = engine.get_region(region_id).unwrap(); let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone(); - assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR); + assert_eq!(staging_partition_expr.unwrap(), partition_expr); { let manager = region.manifest_ctx.manifest_manager.read().await; assert_eq!( @@ -292,7 +303,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .partition_expr .as_deref() .unwrap(), - PARTITION_EXPR + &partition_expr, ); assert!(manager.manifest().metadata.partition_expr.is_none()); } @@ -302,7 +313,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: PARTITION_EXPR.to_string(), + partition_expr: partition_expr.clone(), }), ) .await @@ -377,6 +388,7 @@ async fn test_staging_exit_success_with_manifests() { async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); + let partition_expr = default_partition_expr(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { @@ -407,7 +419,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: PARTITION_EXPR.to_string(), + partition_expr: partition_expr.clone(), }), ) .await @@ -465,6 +477,25 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) "Staging manifest directory should contain 3 files before exit, got: {:?}", staging_files_before ); + let region = engine.get_region(region_id).unwrap(); + { + let manager = region.manifest_ctx.manifest_manager.read().await; + let staging_manifest = manager.staging_manifest().unwrap(); + assert_eq!(staging_manifest.files.len(), 3); + assert_eq!( + staging_manifest.metadata.partition_expr.as_ref().unwrap(), + &partition_expr + ); + let expr = parse_partition_expr(Some(partition_expr.as_str())) + .unwrap() + .unwrap(); + for file in staging_manifest.files.values() { + let Some(file_expr) = file.partition_expr.as_ref() else { + continue; + }; + assert_eq!(*file_expr, expr); + } + } // Count normal manifest files before exit let normal_manifest_dir = format!("{}/manifest", region_dir); @@ -583,6 +614,7 @@ async fn test_write_stall_on_enter_staging() { async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) { let mut env = TestEnv::new().await; + let partition_expr = default_partition_expr(); let listener = Arc::new(NotifyEnterStagingResultListener::default()); let engine = env .create_engine_with( @@ -622,7 +654,7 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: PARTITION_EXPR.to_string(), + partition_expr: partition_expr.clone(), }), ) .await @@ -706,6 +738,7 @@ impl Write for MockWriter { } async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) { + let partition_expr = default_partition_expr(); let engine = env .create_engine(MitoConfig { default_experimental_flat_format: flat_format, @@ -723,7 +756,7 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: PARTITION_EXPR.to_string(), + partition_expr: partition_expr.clone(), }), ) .await diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 17bdc9aaeb..7ed0fdbb86 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -36,8 +36,8 @@ use crate::access_layer::{ use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ - Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu, - RegionDroppedSnafu, RegionTruncatedSnafu, Result, + Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu, + RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::memtable::{ @@ -54,7 +54,7 @@ use crate::read::merge::MergeReaderBuilder; use crate::read::{FlatSource, Source}; use crate::region::options::{IndexOptions, MergeMode, RegionOptions}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; -use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; +use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime, @@ -252,6 +252,10 @@ pub(crate) struct RegionFlushTask { pub(crate) flush_semaphore: Arc, /// Whether the region is in staging mode. pub(crate) is_staging: bool, + /// Partition expression of the region. + /// + /// This is used to generate the file meta. + pub(crate) partition_expr: Option, } impl RegionFlushTask { @@ -441,14 +445,8 @@ impl RegionFlushTask { let mut file_metas = Vec::with_capacity(memtables.len()); let mut flushed_bytes = 0; let mut series_count = 0; - // Convert partition expression once outside the map - let partition_expr = match &version.metadata.partition_expr { - None => None, - Some(json_expr) if json_expr.is_empty() => None, - Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) - .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?, - }; let mut flush_metrics = Metrics::new(WriteType::Flush); + let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; for mem in memtables { if mem.is_empty() { // Skip empty memtables. @@ -1333,6 +1331,7 @@ mod tests { index_options: IndexOptions::default(), flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, + partition_expr: None, }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler @@ -1376,6 +1375,7 @@ mod tests { index_options: IndexOptions::default(), flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, + partition_expr: None, }) .collect(); // Schedule first task. @@ -1568,6 +1568,7 @@ mod tests { index_options: IndexOptions::default(), flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, + partition_expr: None, }) .collect(); // Schedule first task. diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index e83a08ba74..5f9300dc33 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -27,7 +27,8 @@ use std::sync::{Arc, Mutex, RwLock}; use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; -use snafu::{OptionExt, ensure}; +use partition::expr::PartitionExpr; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::ManifestVersion; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; @@ -43,8 +44,8 @@ pub use utils::*; use crate::access_layer::AccessLayerRef; use crate::error::{ - FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, - UpdateManifestSnafu, + FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, + RegionTruncatedSnafu, Result, UpdateManifestSnafu, }; use crate::manifest::action::{ RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList, @@ -725,6 +726,28 @@ impl MitoRegion { Ok(()) } + + /// Returns the partition expression string for this region. + /// + /// If the region is currently in staging state, this returns the partition expression held in + /// the staging partition field. Otherwise, it returns the partition expression from the primary + /// region metadata (current committed version). + pub fn maybe_staging_partition_expr_str(&self) -> Option { + let is_staging = self.is_staging(); + if is_staging { + let staging_partition_expr = self.staging_partition_expr.lock().unwrap(); + if staging_partition_expr.is_none() { + warn!( + "Staging partition expr is none for region {} in staging state", + self.region_id + ); + } + staging_partition_expr.clone() + } else { + let version = self.version(); + version.metadata.partition_expr.clone() + } + } } /// Context to update the region manifest. @@ -1271,6 +1294,19 @@ impl ManifestStats { } } +/// Parses the partition expression from a JSON string. +pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result> { + match partition_expr_str { + None => Ok(None), + Some("") => Ok(None), + Some(json_str) => { + let expr = partition::expr::PartitionExpr::from_json_str(json_str) + .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?; + Ok(expr) + } + } +} + #[cfg(test)] mod tests { use std::sync::atomic::AtomicU64; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 37ca5de2ba..40354ff9d1 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -114,13 +114,7 @@ impl RegionWorkerLoop { info!("Flush region: {} before alteration", region_id); // Try to submit a flush task. - let task = self.new_flush_task( - ®ion, - FlushReason::Alter, - None, - self.config.clone(), - region.is_staging(), - ); + let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone()); if let Err(e) = self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task) diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 6dee72525e..e45e8b85ab 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -70,7 +70,6 @@ impl RegionWorkerLoop { FlushReason::EnterStaging, None, self.config.clone(), - region.is_staging(), ); if let Err(e) = self.flush_scheduler diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index a838fd91d8..c39f429323 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -80,13 +80,8 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. - let task = self.new_flush_task( - region, - FlushReason::EngineFull, - None, - self.config.clone(), - region.is_staging(), - ); + let task = + self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -124,13 +119,8 @@ impl RegionWorkerLoop { // Stop flushing regions if memory usage is already below the flush limit break; } - let task = self.new_flush_task( - region, - FlushReason::EngineFull, - None, - self.config.clone(), - region.is_staging(), - ); + let task = + self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); debug!("Scheduling flush task for region {}", region.region_id); // Schedule a flush task for the current region self.flush_scheduler @@ -149,7 +139,6 @@ impl RegionWorkerLoop { reason: FlushReason, row_group_size: Option, engine_config: Arc, - is_staging: bool, ) -> RegionFlushTask { RegionFlushTask { region_id: region.region_id, @@ -164,7 +153,8 @@ impl RegionWorkerLoop { manifest_ctx: region.manifest_ctx.clone(), index_options: region.version().options.index_options.clone(), flush_semaphore: self.flush_semaphore.clone(), - is_staging, + is_staging: region.is_staging(), + partition_expr: region.maybe_staging_partition_expr_str(), } } } @@ -190,14 +180,8 @@ impl RegionWorkerLoop { } else { FlushReason::Manual }; - - let mut task = self.new_flush_task( - ®ion, - reason, - request.row_group_size, - self.config.clone(), - region.is_staging(), - ); + let mut task = + self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); if let Err(e) = self.flush_scheduler @@ -227,7 +211,6 @@ impl RegionWorkerLoop { FlushReason::Periodically, None, self.config.clone(), - region.is_staging(), ); self.flush_scheduler.schedule_flush( region.region_id,