diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml index dd230f5691..737a0adab9 100644 --- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml @@ -1,3 +1,8 @@ +logging: + level: "info" + format: "json" + filters: + - log_store=debug meta: configData: |- [runtime] diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index be9a5a3b0d..08f33af609 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -251,7 +251,6 @@ macro_rules! define_from_tonic_status { .get(key) .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) } - let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE) .and_then(|s| { if let Ok(code) = s.parse::() { diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index b74b8c25b4..a6e2cb05c0 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -108,10 +108,6 @@ pub struct OpenRegion { pub region_wal_options: HashMap, #[serde(default)] pub skip_wal_replay: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub replay_entry_id: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub metadata_replay_entry_id: Option, } impl OpenRegion { @@ -128,22 +124,8 @@ impl OpenRegion { region_options, region_wal_options, skip_wal_replay, - replay_entry_id: None, - metadata_replay_entry_id: None, } } - - /// Sets the replay entry id. - pub fn with_replay_entry_id(mut self, replay_entry_id: Option) -> Self { - self.replay_entry_id = replay_entry_id; - self - } - - /// Sets the metadata replay entry id. - pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option) -> Self { - self.metadata_replay_entry_id = metadata_replay_entry_id; - self - } } /// The instruction of downgrading leader region. @@ -169,7 +151,7 @@ impl Display for DowngradeRegion { } /// Upgrades a follower region to leader region. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct UpgradeRegion { /// The [RegionId]. pub region_id: RegionId, @@ -186,6 +168,24 @@ pub struct UpgradeRegion { /// The hint for replaying memtable. #[serde(default)] pub location_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replay_entry_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metadata_replay_entry_id: Option, +} + +impl UpgradeRegion { + /// Sets the replay entry id. + pub fn with_replay_entry_id(mut self, replay_entry_id: Option) -> Self { + self.replay_entry_id = replay_entry_id; + self + } + + /// Sets the metadata replay entry id. + pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option) -> Self { + self.metadata_replay_entry_id = metadata_replay_entry_id; + self + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -370,8 +370,6 @@ mod tests { region_options, region_wal_options: HashMap::new(), skip_wal_replay: false, - replay_entry_id: None, - metadata_replay_entry_id: None, }; assert_eq!(expected, deserialized); } diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index 776d0fceca..7e36adf623 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -46,7 +46,7 @@ pub struct TopicRegionValue { pub checkpoint: Option, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct ReplayCheckpoint { #[serde(default)] pub entry_id: u64, diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 9095545bcd..f47fe78225 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -238,10 +238,7 @@ mod tests { // Upgrade region let instruction = Instruction::UpgradeRegion(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, - replay_timeout: None, - location_id: None, + ..Default::default() }); assert!( heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 871bc5b9c3..ed5ce72a4d 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_options_allocator::prepare_wal_options; use futures_util::future::BoxFuture; use store_api::path_utils::table_dir; -use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint}; +use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; use crate::heartbeat::handler::HandlerContext; @@ -29,31 +29,18 @@ impl HandlerContext { mut region_options, region_wal_options, skip_wal_replay, - replay_entry_id, - metadata_replay_entry_id, }: OpenRegion, ) -> BoxFuture<'static, Option> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); - let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { - (Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint { - entry_id: replay_entry_id, - metadata_entry_id: Some(metadata_replay_entry_id), - }), - (Some(replay_entry_id), None) => Some(ReplayCheckpoint { - entry_id: replay_entry_id, - metadata_entry_id: None, - }), - _ => None, - }; let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, table_dir: table_dir(®ion_storage_path, region_id.table_id()), path_type: PathType::Bare, options: region_options, skip_wal_replay, - checkpoint, + checkpoint: None, }); let result = self.region_server.handle_request(region_id, request).await; let success = result.is_ok(); diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index b9a324c197..ded0ff2e9d 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -15,7 +15,7 @@ use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_telemetry::{info, warn}; use futures_util::future::BoxFuture; -use store_api::region_request::{RegionCatchupRequest, RegionRequest}; +use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint}; use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::task_tracker::WaitResult; @@ -29,6 +29,8 @@ impl HandlerContext { metadata_last_entry_id, replay_timeout, location_id, + replay_entry_id, + metadata_replay_entry_id, }: UpgradeRegion, ) -> BoxFuture<'static, Option> { Box::pin(async move { @@ -50,6 +52,14 @@ impl HandlerContext { let region_server_moved = self.region_server.clone(); + let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { + (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { + entry_id, + metadata_entry_id, + }), + _ => None, + }; + // The catchup task is almost zero cost if the inside region is writable. // Therefore, it always registers a new catchup task. let register_result = self @@ -66,6 +76,7 @@ impl HandlerContext { entry_id: last_entry_id, metadata_entry_id: metadata_last_entry_id, location_id, + checkpoint, }), ) .await?; @@ -148,10 +159,8 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, replay_timeout, - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -187,10 +196,8 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, replay_timeout, - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -227,10 +234,8 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, replay_timeout, - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -271,9 +276,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, replay_timeout, - last_entry_id: None, - metadata_last_entry_id: None, - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -289,10 +292,8 @@ mod tests { let reply = handler_context .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, replay_timeout: Some(Duration::from_millis(500)), - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -332,10 +333,7 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, - replay_timeout: None, - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); @@ -351,10 +349,8 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, - metadata_last_entry_id: None, replay_timeout: Some(Duration::from_millis(200)), - location_id: None, + ..Default::default() }) .await; assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 55f7a147fd..8914761f33 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -302,6 +302,10 @@ impl LogStore for KafkaLogStore { }, )) .await?; + debug!( + "Appended batch to Kafka, region_grouped_max_offset: {:?}", + region_grouped_max_offset + ); Ok(AppendBatchResponse { last_entry_ids: region_grouped_max_offset.into_iter().collect(), @@ -362,6 +366,17 @@ impl LogStore for KafkaLogStore { .context(GetOffsetSnafu { topic: &provider.topic, })?; + let latest_offset = (end_offset as u64).saturating_sub(1); + self.topic_stats + .entry(provider.clone()) + .and_modify(|stat| { + stat.latest_offset = stat.latest_offset.max(latest_offset); + }) + .or_insert_with(|| TopicStat { + latest_offset, + record_size: 0, + record_num: 0, + }); let region_indexes = if let (Some(index), Some(collector)) = (index, self.client_manager.global_index_collector()) @@ -550,6 +565,7 @@ mod tests { use futures::TryStreamExt; use rand::prelude::SliceRandom; use rand::Rng; + use rskafka::client::partition::OffsetAt; use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; @@ -713,8 +729,16 @@ mod tests { .for_each(|entry| entry.set_entry_id(0)); assert_eq!(expected_entries, actual_entries); } - let high_wathermark = logstore.latest_entry_id(&provider).unwrap(); - assert_eq!(high_wathermark, 99); + let latest_entry_id = logstore.latest_entry_id(&provider).unwrap(); + let client = logstore + .client_manager + .get_or_insert(provider.as_kafka_provider().unwrap()) + .await + .unwrap(); + assert_eq!(latest_entry_id, 99); + // The latest offset is the offset of the last record plus one. + let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(latest, 100); } #[tokio::test] diff --git a/src/log-store/src/kafka/periodic_offset_fetcher.rs b/src/log-store/src/kafka/periodic_offset_fetcher.rs index 92eb119aa3..8e441c3d34 100644 --- a/src/log-store/src/kafka/periodic_offset_fetcher.rs +++ b/src/log-store/src/kafka/periodic_offset_fetcher.rs @@ -112,11 +112,11 @@ mod tests { let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; assert_eq!(current_latest_offset, 0); - let record = vec![record()]; + let record = vec![record(), record()]; let region = RegionId::new(1, 1); producer.produce(region, record.clone()).await.unwrap(); tokio::time::sleep(Duration::from_millis(150)).await; let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; - assert_eq!(current_latest_offset, record.len() as u64); + assert_eq!(current_latest_offset, record.len() as u64 - 1); } } diff --git a/src/log-store/src/kafka/worker/fetch_latest_offset.rs b/src/log-store/src/kafka/worker/fetch_latest_offset.rs index 6fda16d3d1..45e8c6f287 100644 --- a/src/log-store/src/kafka/worker/fetch_latest_offset.rs +++ b/src/log-store/src/kafka/worker/fetch_latest_offset.rs @@ -33,30 +33,34 @@ impl BackgroundProducerWorker { .context(error::GetOffsetSnafu { topic: &self.provider.topic, }) { - Ok(offset) => match self.topic_stats.entry(self.provider.clone()) { - dashmap::Entry::Occupied(mut occupied_entry) => { - let offset = offset as u64; - let stat = occupied_entry.get_mut(); - if stat.latest_offset < offset { - stat.latest_offset = offset; + Ok(highwatermark) => { + // The highwatermark is the offset of the last record plus one. + let offset = (highwatermark as u64).saturating_sub(1); + + match self.topic_stats.entry(self.provider.clone()) { + dashmap::Entry::Occupied(mut occupied_entry) => { + let stat = occupied_entry.get_mut(); + if stat.latest_offset < offset { + stat.latest_offset = offset; + debug!( + "Updated latest offset for topic {} to {}", + self.provider.topic, offset + ); + } + } + dashmap::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(TopicStat { + latest_offset: offset, + record_size: 0, + record_num: 0, + }); debug!( - "Updated latest offset for topic {} to {}", + "Inserted latest offset for topic {} to {}", self.provider.topic, offset ); } } - dashmap::Entry::Vacant(vacant_entry) => { - vacant_entry.insert(TopicStat { - latest_offset: offset as u64, - record_size: 0, - record_num: 0, - }); - debug!( - "Inserted latest offset for topic {} to {}", - self.provider.topic, offset - ); - } - }, + } Err(err) => { error!(err; "Failed to get latest offset for topic {}", self.provider.topic); } diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 28be968a9f..30576fbd11 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -82,7 +82,7 @@ lazy_static! { .unwrap(); /// The triggered region flush total counter. pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec = - register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap(); + register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name"]).unwrap(); /// The triggered region checkpoint total counter. pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec = diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index ce0a0af7a6..7228108cb2 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage; use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; -use common_meta::wal_options_allocator::extract_topic_from_wal_options; use common_meta::RegionIdent; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; @@ -68,7 +67,6 @@ impl OpenCandidateRegion { async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result { let pc = &ctx.persistent_ctx; let table_id = pc.region_id.table_id(); - let region_id = pc.region_id; let region_number = pc.region_id.region_number(); let candidate_id = pc.to_peer.id; let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; @@ -80,31 +78,18 @@ impl OpenCandidateRegion { engine, } = datanode_table_value.region_info.clone(); - let checkpoint = - if let Some(topic) = extract_topic_from_wal_options(region_id, ®ion_wal_options) { - ctx.fetch_replay_checkpoint(&topic).await.ok().flatten() - } else { - None - }; - - let open_instruction = Instruction::OpenRegion( - OpenRegion::new( - RegionIdent { - datanode_id: candidate_id, - table_id, - region_number, - engine, - }, - ®ion_storage_path, - region_options, - region_wal_options, - true, - ) - .with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id)) - .with_metadata_replay_entry_id( - checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id), - ), - ); + let open_instruction = Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + datanode_id: candidate_id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + region_wal_options, + true, + )); Ok(open_instruction) } @@ -241,8 +226,6 @@ mod tests { region_options: Default::default(), region_wal_options: Default::default(), skip_wal_replay: true, - replay_entry_id: None, - metadata_replay_entry_id: None, }) } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index b0f9fc902e..b9a2c506ee 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage; use common_meta::ddl::utils::parse_region_wal_options; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_meta::lock_key::RemoteWalLock; +use common_meta::wal_options_allocator::extract_topic_from_wal_options; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{error, warn}; use common_wal::options::WalOptions; @@ -111,23 +112,40 @@ impl UpgradeCandidateRegion { } /// Builds upgrade region instruction. - fn build_upgrade_region_instruction( + async fn build_upgrade_region_instruction( &self, - ctx: &Context, + ctx: &mut Context, replay_timeout: Duration, - ) -> Instruction { + ) -> Result { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id; + // Try our best to retrieve replay checkpoint. + let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok(); + let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| { + extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options) + }) { + ctx.fetch_replay_checkpoint(&topic).await.ok().flatten() + } else { + None + }; - Instruction::UpgradeRegion(UpgradeRegion { - region_id, - last_entry_id, - metadata_last_entry_id, - replay_timeout: Some(replay_timeout), - location_id: Some(ctx.persistent_ctx.from_peer.id), - }) + let upgrade_instruction = Instruction::UpgradeRegion( + UpgradeRegion { + region_id, + last_entry_id, + metadata_last_entry_id, + replay_timeout: Some(replay_timeout), + location_id: Some(ctx.persistent_ctx.from_peer.id), + replay_entry_id: None, + metadata_replay_entry_id: None, + } + .with_replay_entry_id(checkpoint.map(|c| c.entry_id)) + .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)), + ); + + Ok(upgrade_instruction) } /// Tries to upgrade a candidate region. @@ -144,16 +162,19 @@ impl UpgradeCandidateRegion { /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible). /// - [ExceededDeadline](error::Error::ExceededDeadline) /// - Invalid JSON (impossible). - async fn upgrade_region(&self, ctx: &Context) -> Result<()> { - let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; - let candidate = &pc.to_peer; + async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> { let operation_timeout = ctx.next_operation_timeout() .context(error::ExceededDeadlineSnafu { operation: "Upgrade region", })?; - let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout); + let upgrade_instruction = self + .build_upgrade_region_instruction(ctx, operation_timeout) + .await?; + + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + let candidate = &pc.to_peer; let msg = MailboxMessage::json_message( &format!("Upgrade candidate region: {}", region_id), @@ -283,8 +304,12 @@ impl UpgradeCandidateRegion { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; + use common_meta::key::table_route::TableRouteValue; + use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; use super::*; @@ -308,14 +333,33 @@ mod tests { } } + async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { + let table_info = + new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(ctx.persistent_ctx.region_id), + leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), + follower_peers: vec![ctx.persistent_ctx.to_peer.clone()], + ..Default::default() + }]; + ctx.table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + wal_options, + ) + .await + .unwrap(); + } + #[tokio::test] async fn test_datanode_is_unreachable() { let state = UpgradeCandidateRegion::default(); let persistent_context = new_persistent_context(); let env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); - - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::PusherNotFound { .. }); assert!(!err.is_retryable()); @@ -328,7 +372,8 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -339,7 +384,7 @@ mod tests { drop(rx); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::PushMessage { .. }); assert!(!err.is_retryable()); @@ -351,10 +396,11 @@ mod tests { let persistent_context = new_persistent_context(); let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; ctx.volatile_ctx.metrics.operations_elapsed = ctx.persistent_ctx.timeout + Duration::from_secs(1); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::ExceededDeadline { .. }); assert!(!err.is_retryable()); @@ -367,7 +413,8 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -379,7 +426,7 @@ mod tests { send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::UnexpectedInstructionReply { .. }); assert!(!err.is_retryable()); } @@ -391,7 +438,8 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -411,7 +459,7 @@ mod tests { )) }); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::RetryLater { .. }); assert!(err.is_retryable()); @@ -425,7 +473,8 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -439,7 +488,7 @@ mod tests { Ok(new_upgrade_region_reply(id, true, false, None)) }); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::Unexpected { .. }); assert!(!err.is_retryable()); @@ -457,7 +506,8 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -471,7 +521,7 @@ mod tests { Ok(new_upgrade_region_reply(id, false, true, None)) }); - let err = state.upgrade_region(&ctx).await.unwrap_err(); + let err = state.upgrade_region(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::RetryLater { .. }); assert!(err.is_retryable()); @@ -491,7 +541,7 @@ mod tests { Ok(new_upgrade_region_reply(id, false, true, None)) }); - state.upgrade_region(&ctx).await.unwrap(); + state.upgrade_region(&mut ctx).await.unwrap(); } #[tokio::test] @@ -503,6 +553,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -563,6 +614,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -621,6 +673,7 @@ mod tests { let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + prepare_table_metadata(&ctx, HashMap::default()).await; let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); ctx.volatile_ctx.metrics.operations_elapsed = diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 02c78a559a..2fe918dfd9 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -29,7 +29,6 @@ use common_time::util::current_time_millis; use common_wal::config::kafka::common::{ DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, }; -use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; @@ -223,23 +222,25 @@ impl RegionFlushTrigger { &self, topic: &str, region_ids: &[RegionId], + topic_regions: &HashMap, leader_regions: &HashMap, ) -> Result<()> { let regions = region_ids .iter() .flat_map(|region_id| match leader_regions.get(region_id) { - Some(leader_region) => { - let entry_id = leader_region.manifest.replay_entry_id(); - let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id(); - - Some(( + Some(leader_region) => should_persist_region_checkpoint( + leader_region, + topic_regions + .get(region_id) + .cloned() + .and_then(|value| value.checkpoint), + ) + .map(|checkpoint| { + ( TopicRegionKey::new(*region_id, topic), - Some(TopicRegionValue::new(Some(ReplayCheckpoint::new( - entry_id, - metadata_entry_id, - )))), - )) - } + Some(TopicRegionValue::new(Some(checkpoint))), + ) + }), None => None, }) .collect::>(); @@ -272,14 +273,14 @@ impl RegionFlushTrigger { latest_entry_id: u64, avg_record_size: usize, ) -> Result<()> { - let region_ids = self + let topic_regions = self .table_metadata_manager .topic_region_manager() .regions(topic) .await .context(error::TableMetadataManagerSnafu)?; - if region_ids.is_empty() { + if topic_regions.is_empty() { debug!("No regions found for topic: {}", topic); return Ok(()); } @@ -287,7 +288,7 @@ impl RegionFlushTrigger { // Filters regions need to persist checkpoints. let regions_to_persist = filter_regions_by_replay_size( topic, - region_ids + topic_regions .iter() .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())), avg_record_size as u64, @@ -296,33 +297,25 @@ impl RegionFlushTrigger { ); let region_manifests = self .leader_region_registry - .batch_get(region_ids.keys().cloned()); + .batch_get(topic_regions.keys().cloned()); if let Err(err) = self - .persist_region_checkpoints(topic, ®ions_to_persist, ®ion_manifests) + .persist_region_checkpoints( + topic, + ®ions_to_persist, + &topic_regions, + ®ion_manifests, + ) .await { error!(err; "Failed to persist region checkpoints for topic: {}", topic); } - let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests + let regions = region_manifests .into_iter() - .partition_map(|(region_id, region)| { - if !region.manifest.is_inactive() { - itertools::Either::Left((region_id, region.manifest.prunable_entry_id())) - } else { - itertools::Either::Right((region_id, region.manifest.prunable_entry_id())) - } - }); - - let min_entry_id = inactive_regions - .iter() - .min_by_key(|(_, entry_id)| *entry_id); - let min_entry_id = active_regions - .iter() - .min_by_key(|(_, entry_id)| *entry_id) - .or(min_entry_id); - + .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id())) + .collect::>(); + let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id); if let Some((_, min_entry_id)) = min_entry_id { let replay_size = (latest_entry_id.saturating_sub(*min_entry_id)) .saturating_mul(avg_record_size as u64); @@ -332,45 +325,28 @@ impl RegionFlushTrigger { } // Selects regions to flush from the set of active regions. - let mut regions_to_flush = filter_regions_by_replay_size( + let regions_to_flush = filter_regions_by_replay_size( topic, - active_regions.into_iter(), + regions.into_iter(), avg_record_size as u64, latest_entry_id, self.flush_trigger_size, ); - let active_regions_num = regions_to_flush.len(); - // Selects regions to flush from the set of inactive regions. - // For inactive regions, we use a lower flush trigger size (half of the normal size) - // to encourage more aggressive flushing to update the region's topic latest entry id. - let inactive_regions_to_flush = filter_regions_by_replay_size( - topic, - inactive_regions.into_iter(), - avg_record_size as u64, - latest_entry_id, - self.flush_trigger_size / 2, - ); - let inactive_regions_num = inactive_regions_to_flush.len(); - regions_to_flush.extend(inactive_regions_to_flush); - // Sends flush instructions to datanodes. if !regions_to_flush.is_empty() { self.send_flush_instructions(®ions_to_flush).await?; debug!( - "Sent {} flush instructions to datanodes for topic: '{}' ({} inactive regions)", + "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}", regions_to_flush.len(), topic, - inactive_regions_num, + regions_to_flush, ); } metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL - .with_label_values(&[topic, "active"]) - .inc_by(active_regions_num as u64); - metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL - .with_label_values(&[topic, "inactive"]) - .inc_by(inactive_regions_num as u64); + .with_label_values(&[topic]) + .inc_by(regions_to_flush.len() as u64); Ok(()) } @@ -409,6 +385,26 @@ impl RegionFlushTrigger { } } +/// Determines whether a region checkpoint should be persisted based on current and persisted state. +fn should_persist_region_checkpoint( + current: &LeaderRegion, + persisted: Option, +) -> Option { + let new_checkpoint = ReplayCheckpoint::new( + current.manifest.replay_entry_id(), + current.manifest.metadata_replay_entry_id(), + ); + + let Some(persisted) = persisted else { + return Some(new_checkpoint); + }; + + if new_checkpoint > persisted { + return Some(new_checkpoint); + } + None +} + /// Filter regions based on the estimated replay size. /// /// Returns the regions if its estimated replay size exceeds the given threshold. @@ -497,6 +493,7 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool { #[cfg(test)] mod tests { use common_base::readable_size::ReadableSize; + use common_meta::region_registry::LeaderRegionManifestInfo; use store_api::storage::RegionId; use super::*; @@ -627,4 +624,92 @@ mod tests { // Only regions 1,1 and 1,2 should be flushed assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]); } + + fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion { + LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Metric { + data_manifest_version: 1, + data_flushed_entry_id: replay_entry_id, + data_topic_latest_entry_id: 0, + metadata_manifest_version: 1, + metadata_flushed_entry_id: metadata_replay_entry_id, + metadata_topic_latest_entry_id: 0, + }, + } + } + + fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion { + LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Mito { + manifest_version: 1, + flushed_entry_id: replay_entry_id, + topic_latest_entry_id: 0, + }, + } + } + + #[test] + fn test_should_persist_region_checkpoint() { + // `persisted` is none + let current = metric_leader_region(100, 10); + let result = should_persist_region_checkpoint(¤t, None).unwrap(); + assert_eq!(result, ReplayCheckpoint::new(100, Some(10))); + + // `persisted.entry_id` is less than `current.manifest.replay_entry_id()` + let current = mito_leader_region(100); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None))) + .unwrap(); + assert_eq!(result, ReplayCheckpoint::new(100, None)); + + let current = metric_leader_region(100, 10); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10)))) + .unwrap(); + assert_eq!(result, ReplayCheckpoint::new(100, Some(10))); + + // `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()` + let current = metric_leader_region(100, 10); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8)))) + .unwrap(); + assert_eq!(result, ReplayCheckpoint::new(100, Some(10))); + + // `persisted.metadata_entry_id` is none + let current = metric_leader_region(100, 10); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None))) + .unwrap(); + assert_eq!(result, ReplayCheckpoint::new(100, Some(10))); + + // `current.manifest.metadata_replay_entry_id()` is none + let current = mito_leader_region(100); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8)))) + .is_none(); + assert!(result); + + // `persisted.entry_id` is equal to `current.manifest.replay_entry_id()` + let current = metric_leader_region(100, 10); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10)))); + assert!(result.is_none()); + let current = mito_leader_region(100); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None))); + assert!(result.is_none()); + + // `persisted.entry_id` is less than `current.manifest.replay_entry_id()` + // `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()` + let current = metric_leader_region(80, 11); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10)))); + assert!(result.is_none()); + let current = mito_leader_region(80); + let result = + should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10)))); + assert!(result.is_none()); + } } diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index b7e02c4edc..6ae4560228 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -15,7 +15,9 @@ use common_telemetry::debug; use snafu::{OptionExt, ResultExt}; use store_api::region_engine::RegionEngine; -use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest}; +use store_api::region_request::{ + AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint, +}; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; @@ -59,6 +61,10 @@ impl MetricEngineInner { entry_id: req.metadata_entry_id, metadata_entry_id: None, location_id: req.location_id, + checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { + entry_id: c.metadata_entry_id.unwrap_or_default(), + metadata_entry_id: None, + }), }), ) .await @@ -73,6 +79,10 @@ impl MetricEngineInner { entry_id: req.entry_id, metadata_entry_id: None, location_id: req.location_id, + checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { + entry_id: c.entry_id, + metadata_entry_id: None, + }), }), ) .await diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d6f324c358..56a926227d 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -1077,6 +1077,7 @@ mod tests { let staging_manifest_ctx = { let manager = RegionManifestManager::new( version_control.current().version.metadata.clone(), + 0, RegionManifestOptions { manifest_dir: "".to_string(), object_store: env.access_layer.object_store().clone(), diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index d346fecebd..968ce9ac3a 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -127,8 +127,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: last_entry_id, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -160,8 +159,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -251,8 +249,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -372,9 +366,7 @@ async fn test_catchup_without_last_entry_id(factory: Option) { region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -465,9 +457,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -503,9 +493,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -652,9 +640,7 @@ async fn test_local_catchup(factory: Option) { region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await; @@ -715,9 +701,7 @@ async fn test_catchup_not_exist() { non_exist_region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, - entry_id: None, - metadata_entry_id: None, - location_id: None, + ..Default::default() }), ) .await diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index a52c18db95..2126f3a06c 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -27,8 +27,8 @@ use crate::error::{ self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, }; use crate::manifest::action::{ - RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, - RegionMetaActionList, + RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder, + RegionMetaAction, RegionMetaActionList, }; use crate::manifest::checkpointer::Checkpointer; use crate::manifest::storage::{ @@ -150,6 +150,7 @@ impl RegionManifestManager { /// Constructs a region's manifest and persist it. pub async fn new( metadata: RegionMetadataRef, + flushed_entry_id: u64, options: RegionManifestOptions, total_manifest_size: Arc, manifest_version: Arc, @@ -163,8 +164,8 @@ impl RegionManifestManager { ); info!( - "Creating region manifest in {} with metadata {:?}", - options.manifest_dir, metadata + "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}", + options.manifest_dir, metadata, flushed_entry_id ); let version = MIN_VERSION; @@ -184,9 +185,21 @@ impl RegionManifestManager { options.manifest_dir, manifest ); + let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })]; + if flushed_entry_id > 0 { + actions.push(RegionMetaAction::Edit(RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: Some(flushed_entry_id), + flushed_sequence: None, + })); + } + // Persist region change. - let action_list = - RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata })); + let action_list = RegionMetaActionList::new(actions); + // New region is not in staging mode. // TODO(ruihang): add staging mode support if needed. store.save(version, &action_list.encode()?, false).await?; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index e6dd142f0b..a3a34695c6 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -1122,6 +1122,7 @@ mod tests { let staging_ctx = { let manager = RegionManifestManager::new( version_control.current().version.metadata.clone(), + 0, RegionManifestOptions { manifest_dir: "".to_string(), object_store: env.access_layer.object_store().clone(), @@ -1187,6 +1188,7 @@ mod tests { let manager = RegionManifestManager::new( metadata.clone(), + 0, RegionManifestOptions { manifest_dir: "".to_string(), object_store: access_layer.object_store().clone(), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e0c343994f..ef1ab4563f 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -238,8 +238,11 @@ impl RegionOpener { // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?; + // For remote WAL, we need to set flushed_entry_id to current topic's latest entry id. + let flushed_entry_id = provider.initial_flushed_entry_id::(wal.store()); let manifest_manager = RegionManifestManager::new( metadata.clone(), + flushed_entry_id, region_manifest_options, self.stats.total_manifest_size.clone(), self.stats.manifest_version.clone(), @@ -439,7 +442,7 @@ impl RegionOpener { .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); - if !self.skip_wal_replay { + let topic_latest_entry_id = if !self.skip_wal_replay { let replay_from_entry_id = self .replay_checkpoint .unwrap_or_default() @@ -461,14 +464,26 @@ impl RegionOpener { on_region_opened, ) .await?; + // For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id. + // Only set after the WAL replay is completed. + let topic_latest_entry_id = if provider.is_remote_wal() + && version_control.current().version.memtables.is_empty() + { + wal.store().latest_entry_id(&provider).unwrap_or(0) + } else { + 0 + }; + + topic_latest_entry_id } else { info!( "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}", region_id, manifest.manifest_version, flushed_entry_id ); - } - let now = self.time_provider.current_time_millis(); + 0 + }; + let now = self.time_provider.current_time_millis(); let region = MitoRegion { region_id: self.region_id, version_control, @@ -483,7 +498,7 @@ impl RegionOpener { last_flush_millis: AtomicI64::new(now), last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), - topic_latest_entry_id: AtomicU64::new(0), + topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), write_bytes: Arc::new(AtomicU64::new(0)), memtable_builder, stats: self.stats.clone(), @@ -713,8 +728,8 @@ where let series_count = version_control.current().series_count(); info!( - "Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}", - region_id, rows_replayed, last_entry_id, series_count, now.elapsed() + "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}", + region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed() ); Ok(last_entry_id) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e6325b513e..598c20eb2c 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -563,6 +563,7 @@ impl TestEnv { if let Some(metadata) = initial_metadata { RegionManifestManager::new( metadata, + 0, manifest_opts, Default::default(), Default::default(), diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index c33ec6b4e6..29dbd279f3 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -116,6 +116,7 @@ impl SchedulerEnv { Arc::new(ManifestContext::new( RegionManifestManager::new( metadata, + 0, RegionManifestOptions { manifest_dir: "".to_string(), object_store: self.access_layer.object_store().clone(), diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 44c5f811ef..e21f36d518 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -65,7 +65,12 @@ impl RegionWorkerLoop { if region.provider.is_remote_wal() { let flushed_entry_id = region.version_control.current().last_entry_id; - info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); + let replay_from_entry_id = request + .checkpoint + .map(|c| c.entry_id) + .unwrap_or_default() + .max(flushed_entry_id); + info!("Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", region.provider); let timer = Instant::now(); let wal_entry_reader = self.wal @@ -75,15 +80,16 @@ impl RegionWorkerLoop { ®ion.provider, wal_entry_reader, region_id, - flushed_entry_id, + replay_from_entry_id, ®ion.version_control, self.config.allow_stale_entries, on_region_opened, ) .await?; info!( - "Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.", + "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.", timer.elapsed(), + region.provider, request.entry_id ); if let Some(expected_last_entry_id) = request.entry_id { diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs index 1c62f7479a..42c7ef12c3 100644 --- a/src/store-api/src/logstore/provider.rs +++ b/src/store-api/src/logstore/provider.rs @@ -15,6 +15,7 @@ use std::fmt::Display; use std::sync::Arc; +use crate::logstore::LogStore; use crate::storage::RegionId; // The Provider of kafka log store @@ -78,6 +79,18 @@ impl Display for Provider { } impl Provider { + /// Returns the initial flushed entry id of the provider. + /// This is used to initialize the flushed entry id of the region when creating the region from scratch. + /// + /// Currently only used for remote WAL. + /// For local WAL, the initial flushed entry id is 0. + pub fn initial_flushed_entry_id(&self, wal: &S) -> u64 { + if matches!(self, Provider::Kafka(_)) { + return wal.latest_entry_id(self).unwrap_or(0); + } + 0 + } + pub fn raft_engine_provider(id: u64) -> Provider { Provider::RaftEngine(RaftEngineProvider { id }) } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 64fd74b6d8..a31d294d29 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -1358,7 +1358,7 @@ pub enum RegionTruncateRequest { /// /// Makes a readonly region to catch up to leader region changes. /// There is no effect if it operating on a leader region. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub struct RegionCatchupRequest { /// Sets it to writable if it's available after it has caught up with all changes. pub set_writable: bool, @@ -1371,6 +1371,8 @@ pub struct RegionCatchupRequest { pub metadata_entry_id: Option, /// The hint for replaying memtable. pub location_id: Option, + /// Replay checkpoint. + pub checkpoint: Option, } /// Get sequences of regions by region ids. diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index 507151820e..381cda02f9 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -22,15 +22,17 @@ INSERT INTO test VALUES Affected Rows: 3 -- SQLNESS SLEEP 3s -SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) +-- For regions using different WAL implementations, the manifest size may vary. +-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest. +SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ -| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ -| 3 | 2699 | 0 | 0 | -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ ++-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | ++-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +| 3 | 78 | 0 | 0 | ++-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; diff --git a/tests/cases/standalone/common/information_schema/region_statistics.sql b/tests/cases/standalone/common/information_schema/region_statistics.sql index ed7a7b0cfc..d031ce5f11 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.sql +++ b/tests/cases/standalone/common/information_schema/region_statistics.sql @@ -17,7 +17,9 @@ INSERT INTO test VALUES (21, 'c', 21); -- SQLNESS SLEEP 3s -SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) +-- For regions using different WAL implementations, the manifest size may vary. +-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest. +SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');