refactor: propagate flush reasons through FlushRegions path (#8051)

* feat: propagate flush reasons through FlushRegions path

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* refactor: address flush reason review feedback

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* refactor: keep flush instruction helper name

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

---------

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
This commit is contained in:
QuakeWang
2026-05-01 10:28:55 +08:00
committed by GitHub
parent b8951a3514
commit 45e990b7f3
24 changed files with 378 additions and 127 deletions

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::region_request::RegionFlushReason;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -338,14 +339,17 @@ pub struct FlushRegions {
/// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
#[serde(default)]
pub error_strategy: FlushErrorStrategy,
/// The source that triggered this flush.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<RegionFlushReason>,
}
impl Display for FlushRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
self.region_ids, self.strategy, self.error_strategy
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})",
self.region_ids, self.strategy, self.error_strategy, self.reason
)
}
}
@@ -357,6 +361,7 @@ impl FlushRegions {
region_ids: vec![region_id],
strategy: FlushStrategy::Sync,
error_strategy: FlushErrorStrategy::FailFast,
reason: None,
}
}
@@ -366,6 +371,7 @@ impl FlushRegions {
region_ids,
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
reason: None,
}
}
@@ -375,9 +381,15 @@ impl FlushRegions {
region_ids,
strategy: FlushStrategy::Sync,
error_strategy,
reason: None,
}
}
pub fn with_reason(mut self, reason: RegionFlushReason) -> Self {
self.reason = Some(reason);
self
}
/// Check if this is a single region flush.
pub fn is_single_region(&self) -> bool {
self.region_ids.len() == 1
@@ -1363,6 +1375,7 @@ mod tests {
assert!(!single_sync.is_hint());
assert!(single_sync.is_sync());
assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(single_sync.reason, None);
assert!(single_sync.is_single_region());
assert_eq!(single_sync.single_region_id(), Some(region_id));
@@ -1374,6 +1387,7 @@ mod tests {
assert!(batch_async.is_hint());
assert!(!batch_async.is_sync());
assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
assert_eq!(batch_async.reason, None);
assert!(!batch_async.is_single_region());
assert_eq!(batch_async.single_region_id(), None);
@@ -1384,6 +1398,10 @@ mod tests {
assert!(!batch_sync.is_hint());
assert!(batch_sync.is_sync());
assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(batch_sync.reason, None);
let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune);
assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune));
}
#[test]
@@ -1401,6 +1419,7 @@ mod tests {
region_ids: vec![region_id],
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
reason: None,
};
assert_eq!(flush_regions.region_ids, vec![region_id]);
assert_eq!(flush_regions.strategy, FlushStrategy::Async);
@@ -1450,6 +1469,7 @@ mod tests {
let instruction = Instruction::FlushRegions(flush_regions.clone());
let serialized = serde_json::to_string(&instruction).unwrap();
assert!(!serialized.contains("reason"));
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
match deserialized {
@@ -1457,6 +1477,32 @@ mod tests {
assert_eq!(fr.region_ids, vec![region_id]);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}
let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#;
let deserialized: Instruction = serde_json::from_str(legacy).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.region_ids, vec![region_id]);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}
let flush_regions = FlushRegions::async_batch(vec![region_id])
.with_reason(RegionFlushReason::RemoteWalPrune);
let instruction = Instruction::FlushRegions(flush_regions);
let serialized = serde_json::to_string(&instruction).unwrap();
assert!(serialized.contains(r#""reason":"RemoteWalPrune""#));
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune));
}
_ => panic!("Expected FlushRegions instruction"),
}
@@ -1479,6 +1525,7 @@ mod tests {
assert!(!fr.is_hint());
assert!(fr.is_sync());
assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}

View File

@@ -113,9 +113,7 @@ impl DowngradeRegionsHandler {
region_server_moved
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await?;

View File

@@ -18,7 +18,7 @@ use common_meta::instruction::{
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
};
use common_telemetry::{debug, warn};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
@@ -39,14 +39,17 @@ impl InstructionHandler for FlushRegionsHandler {
let strategy = flush_regions.strategy;
let region_ids = flush_regions.region_ids;
let error_strategy = flush_regions.error_strategy;
let reason = flush_regions.reason;
let reply = if matches!(strategy, FlushStrategy::Async) {
// Asynchronous hint mode: fire-and-forget, no reply expected
ctx.handle_flush_hint(region_ids).await;
ctx.handle_flush_hint(region_ids, reason).await;
None
} else {
// Synchronous mode: return reply with results
let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
let reply = ctx
.handle_flush_sync(region_ids, error_strategy, reason)
.await;
Some(InstructionReply::FlushRegions(reply))
};
@@ -62,9 +65,14 @@ impl InstructionHandler for FlushRegionsHandler {
impl HandlerContext {
/// Performs the actual region flush operation.
async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
async fn perform_region_flush(
&self,
region_id: RegionId,
reason: Option<RegionFlushReason>,
) -> Result<()> {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
reason,
..Default::default()
});
self.region_server
.handle_request(region_id, request)
@@ -73,10 +81,14 @@ impl HandlerContext {
}
/// Handles asynchronous flush hints (fire-and-forget).
async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
async fn handle_flush_hint(
&self,
region_ids: Vec<RegionId>,
reason: Option<RegionFlushReason>,
) {
let start_time = Instant::now();
for region_id in &region_ids {
let result = self.perform_region_flush(*region_id).await;
let result = self.perform_region_flush(*region_id, reason).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
@@ -102,11 +114,12 @@ impl HandlerContext {
&self,
region_ids: Vec<RegionId>,
error_strategy: FlushErrorStrategy,
reason: Option<RegionFlushReason>,
) -> FlushRegionReply {
let mut results = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
let result = self.flush_single_region_sync(region_id).await;
let result = self.flush_single_region_sync(region_id, reason).await;
match &result {
Ok(_) => results.push((region_id, Ok(()))),
@@ -127,7 +140,11 @@ impl HandlerContext {
}
/// Flushes a single region synchronously with proper error handling.
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
async fn flush_single_region_sync(
&self,
region_id: RegionId,
reason: Option<RegionFlushReason>,
) -> Result<()> {
// Check if region is leader and writable
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Err(RegionNotFoundSnafu { region_id }.build());
@@ -148,7 +165,8 @@ impl HandlerContext {
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
reason,
..Default::default()
}),
)
.await?;
@@ -184,19 +202,27 @@ mod tests {
use super::*;
use crate::tests::{MockRegionEngine, mock_region_server};
type FlushedRequests = Arc<RwLock<Vec<(RegionId, Option<RegionFlushReason>)>>>;
#[tokio::test]
async fn test_handle_flush_region_hint() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
for region_id in &region_ids {
let flushed_region_ids_ref = flushed_region_ids.clone();
let flushed_requests_ref = flushed_requests.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn =
Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Some(Box::new(move |region_id, request| {
let RegionRequest::Flush(request) = request else {
panic!("Expected flush request");
};
flushed_requests_ref
.write()
.unwrap()
.push((region_id, request.reason));
Ok(0)
}))
});
@@ -206,36 +232,47 @@ mod tests {
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
let flush_instruction = FlushRegions::async_batch(region_ids.clone())
.with_reason(RegionFlushReason::RemoteWalPrune);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none()); // Hint mode returns no reply
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
let expected = region_ids
.iter()
.map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune)))
.collect::<Vec<_>>();
assert_eq!(*flushed_requests.read().unwrap(), expected);
// Non-existent regions
flushed_region_ids.write().unwrap().clear();
flushed_requests.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
assert!(flushed_requests.read().unwrap().is_empty());
}
#[tokio::test]
async fn test_handle_flush_region_sync_single() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let flushed_region_ids_ref = flushed_region_ids.clone();
let flushed_requests_ref = flushed_requests.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| {
let RegionRequest::Flush(request) = request else {
panic!("Expected flush request");
};
flushed_requests_ref
.write()
.unwrap()
.push((region_id, request.reason));
Ok(0)
}))
});
@@ -243,7 +280,8 @@ mod tests {
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let flush_instruction = FlushRegions::sync_single(region_id);
let flush_instruction =
FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
@@ -252,7 +290,10 @@ mod tests {
assert_eq!(flush_reply.results.len(), 1);
assert_eq!(flush_reply.results[0].0, region_id);
assert!(flush_reply.results[0].1.is_ok());
assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
assert_eq!(
*flushed_requests.read().unwrap(),
vec![(region_id, Some(RegionFlushReason::Repartition))]
);
}
#[tokio::test]
@@ -333,7 +374,7 @@ mod tests {
let display = format!("{}", flush_regions);
assert_eq!(
display,
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)"
);
}
}

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::region_request::RegionFlushReason;
use tokio::time::Instant;
use crate::error::{self, Result};
@@ -86,6 +87,7 @@ impl PreFlushRegion {
leader,
operation_timeout,
utils::ErrorStrategy::Ignore,
Some(RegionFlushReason::RegionMigration),
)
.await
}

View File

@@ -28,6 +28,7 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future::{join_all, try_join_all};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_request::RegionFlushReason;
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
@@ -418,6 +419,7 @@ impl EnterStagingRegion {
peer,
operation_timeout,
ErrorStrategy::Retry,
Some(RegionFlushReason::Repartition),
)
})
.collect::<Vec<_>>();

View File

@@ -27,6 +27,7 @@ use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::region_request::RegionFlushReason;
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
@@ -85,6 +86,7 @@ impl SyncRegion {
&prepare_result.central_region_datanode,
operation_timeout,
ErrorStrategy::Retry,
Some(RegionFlushReason::Repartition),
)
.await
}

View File

@@ -20,6 +20,7 @@ use common_meta::peer::Peer;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use store_api::region_request::RegionFlushReason;
use store_api::storage::RegionId;
use tokio::time::Instant;
@@ -99,11 +100,12 @@ pub(crate) async fn flush_region(
datanode: &Peer,
timeout: Duration,
error_strategy: ErrorStrategy,
reason: Option<RegionFlushReason>,
) -> Result<()> {
let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
region_ids.to_vec(),
FlushErrorStrategy::TryAll,
));
let mut flush_regions =
FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll);
flush_regions.reason = reason;
let flush_instruction = Instruction::FlushRegions(flush_regions);
let tracing_ctx = TracingContext::from_current_span();
let msg = MailboxMessage::json_message(
@@ -308,6 +310,70 @@ pub mod mock {
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::instruction::{FlushStrategy, Instruction};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region};
#[tokio::test]
async fn test_flush_region_payload_includes_reason() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build();
let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
let datanode = Peer::new(1, "127.0.0.1:4001");
let region_id = RegionId::new(1024, 1);
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx)
.await;
let mailbox = mailbox_ctx.mailbox().clone();
let reply_mailbox = mailbox.clone();
let reply_task = tokio::spawn(async move {
let response = rx.recv().await.unwrap().unwrap();
let msg = response.mailbox_message.unwrap();
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let Instruction::FlushRegions(flush_regions) = instruction else {
panic!("Expected FlushRegions instruction");
};
assert_eq!(flush_regions.region_ids, vec![region_id]);
assert_eq!(flush_regions.strategy, FlushStrategy::Sync);
assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition));
reply_mailbox
.on_recv(
msg.id,
Ok(new_flush_region_reply_for_region(
msg.id, region_id, true, None,
)),
)
.await
.unwrap();
});
flush_region(
&mailbox,
"127.0.0.1:3002",
&[region_id],
&datanode,
Duration::from_secs(5),
ErrorStrategy::Retry,
Some(RegionFlushReason::Repartition),
)
.await
.unwrap();
reply_task.await.unwrap();
}
}
#[cfg(test)]
pub mod test_data {
use std::sync::Arc;

View File

@@ -31,6 +31,7 @@ use common_wal::config::kafka::common::{
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
};
use snafu::{OptionExt, ResultExt};
use store_api::region_request::RegionFlushReason;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
@@ -358,8 +359,10 @@ impl RegionFlushTrigger {
let flush_instructions = leader_to_region_ids
.into_iter()
.map(|(leader, region_ids)| {
let flush_instruction =
Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
let flush_instruction = Instruction::FlushRegions(
FlushRegions::async_batch(region_ids)
.with_reason(RegionFlushReason::RemoteWalPrune),
);
(leader, flush_instruction)
});
@@ -502,11 +505,20 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_meta::region_registry::LeaderRegionManifestInfo;
use common_meta::instruction::FlushStrategy;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
use common_meta::sequence::SequenceBuilder;
use common_meta::stats::topic::TopicStatsRegistry;
use store_api::storage::RegionId;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata};
#[test]
fn test_is_recent() {
@@ -722,4 +734,58 @@ mod tests {
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
assert!(result.is_none());
}
#[tokio::test]
async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
let mailbox_sequence =
SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build();
let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let (trigger, _ticker) = RegionFlushTrigger::new(
table_metadata_manager.clone(),
leader_region_registry.clone(),
topic_stats_registry,
mailbox_ctx.mailbox().clone(),
"127.0.0.1:3002".to_string(),
ReadableSize(1),
ReadableSize(1),
);
let topic = "test_topic".to_string();
new_wal_prune_metadata(
table_metadata_manager,
leader_region_registry,
1,
1,
&[0],
topic,
)
.await;
let region_id = RegionId::new(0, 0);
trigger.send_flush_instructions(&[region_id]).await.unwrap();
let response = rx.recv().await.unwrap().unwrap();
let msg = response.mailbox_message.unwrap();
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let Instruction::FlushRegions(flush_regions) = instruction else {
panic!("Expected FlushRegions instruction");
};
assert_eq!(flush_regions.region_ids, vec![region_id]);
assert_eq!(flush_regions.strategy, FlushStrategy::Async);
assert_eq!(
flush_regions.reason,
Some(RegionFlushReason::RemoteWalPrune)
);
}
}

View File

@@ -688,9 +688,7 @@ mod test {
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -231,9 +231,7 @@ mod tests {
metric_engine
.handle_request(
source_physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -64,9 +64,7 @@ impl TaskFunction<Error> for FlushMetadataRegionTask {
.mito
.handle_request(
metadata_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
{

View File

@@ -230,9 +230,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -503,9 +501,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -624,9 +620,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -733,9 +727,7 @@ async fn test_split_repartition_causes_duplicate_data() {
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -928,9 +920,7 @@ async fn test_merge_repartition_data_integrity() {
engine
.handle_request(
source_region_id_1,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -944,9 +934,7 @@ async fn test_merge_repartition_data_integrity() {
engine
.handle_request(
source_region_id_2,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -932,9 +932,7 @@ async fn test_list_ssts_with_format(
engine
.handle_request(
*region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -1124,9 +1122,7 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -55,9 +55,7 @@ pub(crate) async fn put_and_flush(
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -68,9 +66,7 @@ async fn flush(engine: &MitoEngine, region_id: RegionId) {
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -116,9 +112,7 @@ pub(crate) async fn delete_and_flush(
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -71,9 +71,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index:
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -184,9 +182,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -89,9 +89,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -122,9 +120,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -194,9 +194,7 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -402,9 +402,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -610,9 +608,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -703,9 +699,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -720,9 +714,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();

View File

@@ -217,9 +217,7 @@ async fn test_engine_truncate_after_flush_with_format(flat_format: bool) {
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
@@ -395,9 +393,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
engine_cloned
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
});

View File

@@ -26,6 +26,7 @@ use datatypes::arrow::datatypes::SchemaRef;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
use snafu::ResultExt;
use store_api::region_request::RegionFlushReason;
use store_api::storage::{RegionId, SequenceNumber};
use strum::IntoStaticStr;
use tokio::sync::{Semaphore, mpsc, watch};
@@ -221,6 +222,12 @@ pub enum FlushReason {
EnterStaging,
/// Flush when region is closing.
Closing,
/// Flush triggered before region migration.
RegionMigration,
/// Flush triggered by repartition procedure.
Repartition,
/// Flush triggered by remote WAL pruning.
RemoteWalPrune,
}
impl FlushReason {
@@ -230,6 +237,16 @@ impl FlushReason {
}
}
impl From<RegionFlushReason> for FlushReason {
fn from(reason: RegionFlushReason) -> Self {
match reason {
RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
RegionFlushReason::Repartition => FlushReason::Repartition,
RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
}
}
}
/// Task to flush a region.
pub(crate) struct RegionFlushTask {
/// Region to flush.

View File

@@ -1247,7 +1247,10 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId, row_group_si
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest { row_group_size }),
RegionRequest::Flush(RegionFlushRequest {
row_group_size,
reason: None,
}),
)
.await
.unwrap();

View File

@@ -50,9 +50,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Region {} has pending data, waiting for flush", region_id);
self.handle_flush_request(
region_id,
RegionFlushRequest {
row_group_size: None,
},
RegionFlushRequest::default(),
Some(FlushReason::Closing),
sender,
);

View File

@@ -19,7 +19,7 @@ use std::sync::atomic::Ordering;
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::region_request::{RegionFlushReason, RegionFlushRequest};
use store_api::storage::RegionId;
use crate::config::{IndexBuildMode, MitoConfig};
@@ -30,6 +30,22 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O
use crate::sst::index::IndexBuildType;
use crate::worker::RegionWorkerLoop;
fn resolve_flush_reason(
explicit_reason: Option<FlushReason>,
request_reason: Option<RegionFlushReason>,
is_downgrading: bool,
) -> FlushReason {
explicit_reason
.or_else(|| request_reason.map(FlushReason::from))
.unwrap_or({
if is_downgrading {
FlushReason::Downgrading
} else {
FlushReason::Manual
}
})
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job failed.
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
@@ -176,13 +192,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// when handling flush request instead of in `schedule_flush` or `flush_finished`.
self.update_topic_latest_entry_id(&region);
let reason = reason.unwrap_or_else(|| {
if region.is_downgrading() {
FlushReason::Downgrading
} else {
FlushReason::Manual
}
});
let reason = resolve_flush_reason(reason, request.reason, region.is_downgrading());
let mut task =
self.new_flush_task(&region, reason, request.row_group_size, self.config.clone());
task.push_sender(sender);
@@ -349,3 +359,43 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolve_flush_reason_prefers_explicit_reason() {
let reason = resolve_flush_reason(
Some(FlushReason::Closing),
Some(RegionFlushReason::RemoteWalPrune),
true,
);
assert_eq!(reason, FlushReason::Closing);
}
#[test]
fn test_resolve_flush_reason_uses_request_reason() {
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::RegionMigration), true),
FlushReason::RegionMigration
);
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::Repartition), false),
FlushReason::Repartition
);
assert_eq!(
resolve_flush_reason(None, Some(RegionFlushReason::RemoteWalPrune), false),
FlushReason::RemoteWalPrune
);
}
#[test]
fn test_resolve_flush_reason_fallback_unchanged() {
assert_eq!(
resolve_flush_reason(None, None, true),
FlushReason::Downgrading
);
assert_eq!(resolve_flush_reason(None, None, false), FlushReason::Manual);
}
}

View File

@@ -346,9 +346,7 @@ fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest
let region_id = flush.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)])
}
@@ -1404,9 +1402,20 @@ impl Display for UnsetRegionOption {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionFlushReason {
/// Flush triggered before region migration.
RegionMigration,
/// Flush triggered by repartition procedure.
Repartition,
/// Flush triggered by remote WAL pruning.
RemoteWalPrune,
}
#[derive(Debug, Clone, Default)]
pub struct RegionFlushRequest {
pub row_group_size: Option<usize>,
pub reason: Option<RegionFlushReason>,
}
#[derive(Debug)]