fix(mito2): pass partition expr explicitly to flush task for region (#7461)

* fix(mito2): pass partition expr explicitly to flush task for staging mode

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-24 12:18:06 +08:00
committed by GitHub
parent dec0d522f8
commit 2d9967b981
6 changed files with 101 additions and 55 deletions

View File

@@ -23,11 +23,13 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use object_store::Buffer;
use object_store::layers::mock::{
Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder,
Result as MockResult, Write, Writer,
};
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
use store_api::region_request::{
EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, RegionRequest,
@@ -38,10 +40,16 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::NotifyEnterStagingResultListener;
use crate::error::Error;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region::{RegionLeaderState, RegionRoleState, parse_partition_expr};
use crate::request::WorkerRequest;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
#[tokio::test]
async fn test_staging_state_integration() {
test_staging_state_integration_with_format(false).await;
@@ -227,7 +235,9 @@ async fn test_staging_state_validation_patterns() {
);
}
const PARTITION_EXPR: &str = "partition_expr";
fn default_partition_expr() -> String {
range_expr("a", 0, 100).as_json_str().unwrap()
}
#[tokio::test]
async fn test_staging_manifest_directory() {
@@ -237,6 +247,7 @@ async fn test_staging_manifest_directory() {
async fn test_staging_manifest_directory_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let partition_expr = default_partition_expr();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -274,14 +285,14 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
partition_expr: partition_expr.clone(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
assert_eq!(staging_partition_expr.unwrap(), PARTITION_EXPR);
assert_eq!(staging_partition_expr.unwrap(), partition_expr);
{
let manager = region.manifest_ctx.manifest_manager.read().await;
assert_eq!(
@@ -292,7 +303,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.partition_expr
.as_deref()
.unwrap(),
PARTITION_EXPR
&partition_expr,
);
assert!(manager.manifest().metadata.partition_expr.is_none());
}
@@ -302,7 +313,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
partition_expr: partition_expr.clone(),
}),
)
.await
@@ -377,6 +388,7 @@ async fn test_staging_exit_success_with_manifests() {
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let partition_expr = default_partition_expr();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
@@ -407,7 +419,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
partition_expr: partition_expr.clone(),
}),
)
.await
@@ -465,6 +477,25 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
"Staging manifest directory should contain 3 files before exit, got: {:?}",
staging_files_before
);
let region = engine.get_region(region_id).unwrap();
{
let manager = region.manifest_ctx.manifest_manager.read().await;
let staging_manifest = manager.staging_manifest().unwrap();
assert_eq!(staging_manifest.files.len(), 3);
assert_eq!(
staging_manifest.metadata.partition_expr.as_ref().unwrap(),
&partition_expr
);
let expr = parse_partition_expr(Some(partition_expr.as_str()))
.unwrap()
.unwrap();
for file in staging_manifest.files.values() {
let Some(file_expr) = file.partition_expr.as_ref() else {
continue;
};
assert_eq!(*file_expr, expr);
}
}
// Count normal manifest files before exit
let normal_manifest_dir = format!("{}/manifest", region_dir);
@@ -583,6 +614,7 @@ async fn test_write_stall_on_enter_staging() {
async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let partition_expr = default_partition_expr();
let listener = Arc::new(NotifyEnterStagingResultListener::default());
let engine = env
.create_engine_with(
@@ -622,7 +654,7 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
partition_expr: partition_expr.clone(),
}),
)
.await
@@ -706,6 +738,7 @@ impl Write for MockWriter {
}
async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let partition_expr = default_partition_expr();
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
@@ -723,7 +756,7 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: PARTITION_EXPR.to_string(),
partition_expr: partition_expr.clone(),
}),
)
.await

View File

@@ -36,8 +36,8 @@ use crate::access_layer::{
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
RegionDroppedSnafu, RegionTruncatedSnafu, Result,
Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::{
@@ -54,7 +54,7 @@ use crate::read::merge::MergeReaderBuilder;
use crate::read::{FlatSource, Source};
use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
@@ -252,6 +252,10 @@ pub(crate) struct RegionFlushTask {
pub(crate) flush_semaphore: Arc<Semaphore>,
/// Whether the region is in staging mode.
pub(crate) is_staging: bool,
/// Partition expression of the region.
///
/// This is used to generate the file meta.
pub(crate) partition_expr: Option<String>,
}
impl RegionFlushTask {
@@ -441,14 +445,8 @@ impl RegionFlushTask {
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;
let mut series_count = 0;
// Convert partition expression once outside the map
let partition_expr = match &version.metadata.partition_expr {
None => None,
Some(json_expr) if json_expr.is_empty() => None,
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
};
let mut flush_metrics = Metrics::new(WriteType::Flush);
let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
for mem in memtables {
if mem.is_empty() {
// Skip empty memtables.
@@ -1333,6 +1331,7 @@ mod tests {
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
@@ -1376,6 +1375,7 @@ mod tests {
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
})
.collect();
// Schedule first task.
@@ -1568,6 +1568,7 @@ mod tests {
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
partition_expr: None,
})
.collect();
// Schedule first task.

View File

@@ -27,7 +27,8 @@ use std::sync::{Arc, Mutex, RwLock};
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
use snafu::{OptionExt, ensure};
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
@@ -43,8 +44,8 @@ pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
UpdateManifestSnafu,
FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RegionTruncatedSnafu, Result, UpdateManifestSnafu,
};
use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
@@ -725,6 +726,28 @@ impl MitoRegion {
Ok(())
}
/// Returns the partition expression string for this region.
///
/// If the region is currently in staging state, this returns the partition expression held in
/// the staging partition field. Otherwise, it returns the partition expression from the primary
/// region metadata (current committed version).
pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
let is_staging = self.is_staging();
if is_staging {
let staging_partition_expr = self.staging_partition_expr.lock().unwrap();
if staging_partition_expr.is_none() {
warn!(
"Staging partition expr is none for region {} in staging state",
self.region_id
);
}
staging_partition_expr.clone()
} else {
let version = self.version();
version.metadata.partition_expr.clone()
}
}
}
/// Context to update the region manifest.
@@ -1271,6 +1294,19 @@ impl ManifestStats {
}
}
/// Parses the partition expression from a JSON string.
pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
match partition_expr_str {
None => Ok(None),
Some("") => Ok(None),
Some(json_str) => {
let expr = partition::expr::PartitionExpr::from_json_str(json_str)
.with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
Ok(expr)
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;

View File

@@ -114,13 +114,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Flush region: {} before alteration", region_id);
// Try to submit a flush task.
let task = self.new_flush_task(
&region,
FlushReason::Alter,
None,
self.config.clone(),
region.is_staging(),
);
let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)

View File

@@ -70,7 +70,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::EnterStaging,
None,
self.config.clone(),
region.is_staging(),
);
if let Err(e) =
self.flush_scheduler

View File

@@ -80,13 +80,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
@@ -124,13 +119,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Stop flushing regions if memory usage is already below the flush limit
break;
}
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
None,
self.config.clone(),
region.is_staging(),
);
let task =
self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone());
debug!("Scheduling flush task for region {}", region.region_id);
// Schedule a flush task for the current region
self.flush_scheduler
@@ -149,7 +139,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
reason: FlushReason,
row_group_size: Option<usize>,
engine_config: Arc<MitoConfig>,
is_staging: bool,
) -> RegionFlushTask {
RegionFlushTask {
region_id: region.region_id,
@@ -164,7 +153,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
manifest_ctx: region.manifest_ctx.clone(),
index_options: region.version().options.index_options.clone(),
flush_semaphore: self.flush_semaphore.clone(),
is_staging,
is_staging: region.is_staging(),
partition_expr: region.maybe_staging_partition_expr_str(),
}
}
}
@@ -190,14 +180,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
} else {
FlushReason::Manual
};
let mut task = self.new_flush_task(
&region,
reason,
request.row_group_size,
self.config.clone(),
region.is_staging(),
);
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
task.push_sender(sender);
if let Err(e) =
self.flush_scheduler
@@ -227,7 +211,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
FlushReason::Periodically,
None,
self.config.clone(),
region.is_staging(),
);
self.flush_scheduler.schedule_flush(
region.region_id,