fix: initialize remote WAL regions with correct flushed entry IDs (#6856)

* fix: initialize remote WAL regions with correct flushed entry IDs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: correct latest offset

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add replay checkpoint to catchup request

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-03 12:00:02 +08:00
committed by GitHub
parent c3c79e4c79
commit 7cf47ccf54
27 changed files with 438 additions and 255 deletions

View File

@@ -1,3 +1,8 @@
logging:
level: "info"
format: "json"
filters:
- log_store=debug
meta: meta:
configData: |- configData: |-
[runtime] [runtime]

View File

@@ -251,7 +251,6 @@ macro_rules! define_from_tonic_status {
.get(key) .get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
} }
let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE) let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|s| { .and_then(|s| {
if let Ok(code) = s.parse::<u32>() { if let Ok(code) = s.parse::<u32>() {

View File

@@ -108,10 +108,6 @@ pub struct OpenRegion {
pub region_wal_options: HashMap<RegionNumber, String>, pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)] #[serde(default)]
pub skip_wal_replay: bool, pub skip_wal_replay: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replay_entry_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_replay_entry_id: Option<u64>,
} }
impl OpenRegion { impl OpenRegion {
@@ -128,22 +124,8 @@ impl OpenRegion {
region_options, region_options,
region_wal_options, region_wal_options,
skip_wal_replay, skip_wal_replay,
replay_entry_id: None,
metadata_replay_entry_id: None,
} }
} }
/// Sets the replay entry id.
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
self.replay_entry_id = replay_entry_id;
self
}
/// Sets the metadata replay entry id.
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
self.metadata_replay_entry_id = metadata_replay_entry_id;
self
}
} }
/// The instruction of downgrading leader region. /// The instruction of downgrading leader region.
@@ -169,7 +151,7 @@ impl Display for DowngradeRegion {
} }
/// Upgrades a follower region to leader region. /// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct UpgradeRegion { pub struct UpgradeRegion {
/// The [RegionId]. /// The [RegionId].
pub region_id: RegionId, pub region_id: RegionId,
@@ -186,6 +168,24 @@ pub struct UpgradeRegion {
/// The hint for replaying memtable. /// The hint for replaying memtable.
#[serde(default)] #[serde(default)]
pub location_id: Option<u64>, pub location_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replay_entry_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_replay_entry_id: Option<u64>,
}
impl UpgradeRegion {
/// Sets the replay entry id.
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
self.replay_entry_id = replay_entry_id;
self
}
/// Sets the metadata replay entry id.
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
self.metadata_replay_entry_id = metadata_replay_entry_id;
self
}
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -370,8 +370,6 @@ mod tests {
region_options, region_options,
region_wal_options: HashMap::new(), region_wal_options: HashMap::new(),
skip_wal_replay: false, skip_wal_replay: false,
replay_entry_id: None,
metadata_replay_entry_id: None,
}; };
assert_eq!(expected, deserialized); assert_eq!(expected, deserialized);
} }

View File

@@ -46,7 +46,7 @@ pub struct TopicRegionValue {
pub checkpoint: Option<ReplayCheckpoint>, pub checkpoint: Option<ReplayCheckpoint>,
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReplayCheckpoint { pub struct ReplayCheckpoint {
#[serde(default)] #[serde(default)]
pub entry_id: u64, pub entry_id: u64,

View File

@@ -238,10 +238,7 @@ mod tests {
// Upgrade region // Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion { let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id, region_id,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
}); });
assert!( assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options; use common_meta::wal_options_allocator::prepare_wal_options;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use store_api::path_utils::table_dir; use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint}; use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::handler::HandlerContext;
@@ -29,31 +29,18 @@ impl HandlerContext {
mut region_options, mut region_options,
region_wal_options, region_wal_options,
skip_wal_replay, skip_wal_replay,
replay_entry_id,
metadata_replay_entry_id,
}: OpenRegion, }: OpenRegion,
) -> BoxFuture<'static, Option<InstructionReply>> { ) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move { Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident); let region_id = Self::region_ident_to_region_id(&region_ident);
prepare_wal_options(&mut region_options, region_id, &region_wal_options); prepare_wal_options(&mut region_options, region_id, &region_wal_options);
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint {
entry_id: replay_entry_id,
metadata_entry_id: Some(metadata_replay_entry_id),
}),
(Some(replay_entry_id), None) => Some(ReplayCheckpoint {
entry_id: replay_entry_id,
metadata_entry_id: None,
}),
_ => None,
};
let request = RegionRequest::Open(RegionOpenRequest { let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine, engine: region_ident.engine,
table_dir: table_dir(&region_storage_path, region_id.table_id()), table_dir: table_dir(&region_storage_path, region_id.table_id()),
path_type: PathType::Bare, path_type: PathType::Bare,
options: region_options, options: region_options,
skip_wal_replay, skip_wal_replay,
checkpoint, checkpoint: None,
}); });
let result = self.region_server.handle_request(region_id, request).await; let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok(); let success = result.is_ok();

View File

@@ -15,7 +15,7 @@
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCatchupRequest, RegionRequest}; use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult; use crate::heartbeat::task_tracker::WaitResult;
@@ -29,6 +29,8 @@ impl HandlerContext {
metadata_last_entry_id, metadata_last_entry_id,
replay_timeout, replay_timeout,
location_id, location_id,
replay_entry_id,
metadata_replay_entry_id,
}: UpgradeRegion, }: UpgradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> { ) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move { Box::pin(async move {
@@ -50,6 +52,14 @@ impl HandlerContext {
let region_server_moved = self.region_server.clone(); let region_server_moved = self.region_server.clone();
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
// The catchup task is almost zero cost if the inside region is writable. // The catchup task is almost zero cost if the inside region is writable.
// Therefore, it always registers a new catchup task. // Therefore, it always registers a new catchup task.
let register_result = self let register_result = self
@@ -66,6 +76,7 @@ impl HandlerContext {
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id, metadata_entry_id: metadata_last_entry_id,
location_id, location_id,
checkpoint,
}), }),
) )
.await?; .await?;
@@ -148,10 +159,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -187,10 +196,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -227,10 +234,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -271,9 +276,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
replay_timeout, replay_timeout,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
location_id: None,
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -289,10 +292,8 @@ mod tests {
let reply = handler_context let reply = handler_context
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(500)), replay_timeout: Some(Duration::from_millis(500)),
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -332,10 +333,7 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -351,10 +349,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(200)), replay_timeout: Some(Duration::from_millis(200)),
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));

View File

@@ -302,6 +302,10 @@ impl LogStore for KafkaLogStore {
}, },
)) ))
.await?; .await?;
debug!(
"Appended batch to Kafka, region_grouped_max_offset: {:?}",
region_grouped_max_offset
);
Ok(AppendBatchResponse { Ok(AppendBatchResponse {
last_entry_ids: region_grouped_max_offset.into_iter().collect(), last_entry_ids: region_grouped_max_offset.into_iter().collect(),
@@ -362,6 +366,17 @@ impl LogStore for KafkaLogStore {
.context(GetOffsetSnafu { .context(GetOffsetSnafu {
topic: &provider.topic, topic: &provider.topic,
})?; })?;
let latest_offset = (end_offset as u64).saturating_sub(1);
self.topic_stats
.entry(provider.clone())
.and_modify(|stat| {
stat.latest_offset = stat.latest_offset.max(latest_offset);
})
.or_insert_with(|| TopicStat {
latest_offset,
record_size: 0,
record_num: 0,
});
let region_indexes = if let (Some(index), Some(collector)) = let region_indexes = if let (Some(index), Some(collector)) =
(index, self.client_manager.global_index_collector()) (index, self.client_manager.global_index_collector())
@@ -550,6 +565,7 @@ mod tests {
use futures::TryStreamExt; use futures::TryStreamExt;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use rand::Rng; use rand::Rng;
use rskafka::client::partition::OffsetAt;
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
use store_api::logstore::provider::Provider; use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore; use store_api::logstore::LogStore;
@@ -713,8 +729,16 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0)); .for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries); assert_eq!(expected_entries, actual_entries);
} }
let high_wathermark = logstore.latest_entry_id(&provider).unwrap(); let latest_entry_id = logstore.latest_entry_id(&provider).unwrap();
assert_eq!(high_wathermark, 99); let client = logstore
.client_manager
.get_or_insert(provider.as_kafka_provider().unwrap())
.await
.unwrap();
assert_eq!(latest_entry_id, 99);
// The latest offset is the offset of the last record plus one.
let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(latest, 100);
} }
#[tokio::test] #[tokio::test]

View File

@@ -112,11 +112,11 @@ mod tests {
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, 0); assert_eq!(current_latest_offset, 0);
let record = vec![record()]; let record = vec![record(), record()];
let region = RegionId::new(1, 1); let region = RegionId::new(1, 1);
producer.produce(region, record.clone()).await.unwrap(); producer.produce(region, record.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await; tokio::time::sleep(Duration::from_millis(150)).await;
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, record.len() as u64); assert_eq!(current_latest_offset, record.len() as u64 - 1);
} }
} }

View File

@@ -33,30 +33,34 @@ impl BackgroundProducerWorker {
.context(error::GetOffsetSnafu { .context(error::GetOffsetSnafu {
topic: &self.provider.topic, topic: &self.provider.topic,
}) { }) {
Ok(offset) => match self.topic_stats.entry(self.provider.clone()) { Ok(highwatermark) => {
dashmap::Entry::Occupied(mut occupied_entry) => { // The highwatermark is the offset of the last record plus one.
let offset = offset as u64; let offset = (highwatermark as u64).saturating_sub(1);
let stat = occupied_entry.get_mut();
if stat.latest_offset < offset { match self.topic_stats.entry(self.provider.clone()) {
stat.latest_offset = offset; dashmap::Entry::Occupied(mut occupied_entry) => {
let stat = occupied_entry.get_mut();
if stat.latest_offset < offset {
stat.latest_offset = offset;
debug!(
"Updated latest offset for topic {} to {}",
self.provider.topic, offset
);
}
}
dashmap::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(TopicStat {
latest_offset: offset,
record_size: 0,
record_num: 0,
});
debug!( debug!(
"Updated latest offset for topic {} to {}", "Inserted latest offset for topic {} to {}",
self.provider.topic, offset self.provider.topic, offset
); );
} }
} }
dashmap::Entry::Vacant(vacant_entry) => { }
vacant_entry.insert(TopicStat {
latest_offset: offset as u64,
record_size: 0,
record_num: 0,
});
debug!(
"Inserted latest offset for topic {} to {}",
self.provider.topic, offset
);
}
},
Err(err) => { Err(err) => {
error!(err; "Failed to get latest offset for topic {}", self.provider.topic); error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
} }

View File

@@ -82,7 +82,7 @@ lazy_static! {
.unwrap(); .unwrap();
/// The triggered region flush total counter. /// The triggered region flush total counter.
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec = pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap(); register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name"]).unwrap();
/// The triggered region checkpoint total counter. /// The triggered region checkpoint total counter.
pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec = pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =

View File

@@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo; use common_meta::key::datanode_table::RegionInfo;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_meta::RegionIdent; use common_meta::RegionIdent;
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info; use common_telemetry::info;
@@ -68,7 +67,6 @@ impl OpenCandidateRegion {
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> { async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx; let pc = &ctx.persistent_ctx;
let table_id = pc.region_id.table_id(); let table_id = pc.region_id.table_id();
let region_id = pc.region_id;
let region_number = pc.region_id.region_number(); let region_number = pc.region_id.region_number();
let candidate_id = pc.to_peer.id; let candidate_id = pc.to_peer.id;
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
@@ -80,31 +78,18 @@ impl OpenCandidateRegion {
engine, engine,
} = datanode_table_value.region_info.clone(); } = datanode_table_value.region_info.clone();
let checkpoint = let open_instruction = Instruction::OpenRegion(OpenRegion::new(
if let Some(topic) = extract_topic_from_wal_options(region_id, &region_wal_options) { RegionIdent {
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten() datanode_id: candidate_id,
} else { table_id,
None region_number,
}; engine,
},
let open_instruction = Instruction::OpenRegion( &region_storage_path,
OpenRegion::new( region_options,
RegionIdent { region_wal_options,
datanode_id: candidate_id, true,
table_id, ));
region_number,
engine,
},
&region_storage_path,
region_options,
region_wal_options,
true,
)
.with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id))
.with_metadata_replay_entry_id(
checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id),
),
);
Ok(open_instruction) Ok(open_instruction)
} }
@@ -241,8 +226,6 @@ mod tests {
region_options: Default::default(), region_options: Default::default(),
region_wal_options: Default::default(), region_wal_options: Default::default(),
skip_wal_replay: true, skip_wal_replay: true,
replay_entry_id: None,
metadata_replay_entry_id: None,
}) })
} }

View File

@@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::parse_region_wal_options; use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::lock_key::RemoteWalLock; use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, warn}; use common_telemetry::{error, warn};
use common_wal::options::WalOptions; use common_wal::options::WalOptions;
@@ -111,23 +112,40 @@ impl UpgradeCandidateRegion {
} }
/// Builds upgrade region instruction. /// Builds upgrade region instruction.
fn build_upgrade_region_instruction( async fn build_upgrade_region_instruction(
&self, &self,
ctx: &Context, ctx: &mut Context,
replay_timeout: Duration, replay_timeout: Duration,
) -> Instruction { ) -> Result<Instruction> {
let pc = &ctx.persistent_ctx; let pc = &ctx.persistent_ctx;
let region_id = pc.region_id; let region_id = pc.region_id;
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id; let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
// Try our best to retrieve replay checkpoint.
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
}) {
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
} else {
None
};
Instruction::UpgradeRegion(UpgradeRegion { let upgrade_instruction = Instruction::UpgradeRegion(
region_id, UpgradeRegion {
last_entry_id, region_id,
metadata_last_entry_id, last_entry_id,
replay_timeout: Some(replay_timeout), metadata_last_entry_id,
location_id: Some(ctx.persistent_ctx.from_peer.id), replay_timeout: Some(replay_timeout),
}) location_id: Some(ctx.persistent_ctx.from_peer.id),
replay_entry_id: None,
metadata_replay_entry_id: None,
}
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
);
Ok(upgrade_instruction)
} }
/// Tries to upgrade a candidate region. /// Tries to upgrade a candidate region.
@@ -144,16 +162,19 @@ impl UpgradeCandidateRegion {
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible). /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
/// - [ExceededDeadline](error::Error::ExceededDeadline) /// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON (impossible). /// - Invalid JSON (impossible).
async fn upgrade_region(&self, ctx: &Context) -> Result<()> { async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;
let operation_timeout = let operation_timeout =
ctx.next_operation_timeout() ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu { .context(error::ExceededDeadlineSnafu {
operation: "Upgrade region", operation: "Upgrade region",
})?; })?;
let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout); let upgrade_instruction = self
.build_upgrade_region_instruction(ctx, operation_timeout)
.await?;
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;
let msg = MailboxMessage::json_message( let msg = MailboxMessage::json_message(
&format!("Upgrade candidate region: {}", region_id), &format!("Upgrade candidate region: {}", region_id),
@@ -283,8 +304,12 @@ impl UpgradeCandidateRegion {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::assert_matches::assert_matches; use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use super::*; use super::*;
@@ -308,14 +333,33 @@ mod tests {
} }
} }
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let table_info =
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(ctx.persistent_ctx.region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
..Default::default()
}];
ctx.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
wal_options,
)
.await
.unwrap();
}
#[tokio::test] #[tokio::test]
async fn test_datanode_is_unreachable() { async fn test_datanode_is_unreachable() {
let state = UpgradeCandidateRegion::default(); let state = UpgradeCandidateRegion::default();
let persistent_context = new_persistent_context(); let persistent_context = new_persistent_context();
let env = TestingEnv::new(); let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. }); assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -328,7 +372,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let (tx, rx) = tokio::sync::mpsc::channel(1); let (tx, rx) = tokio::sync::mpsc::channel(1);
@@ -339,7 +384,7 @@ mod tests {
drop(rx); drop(rx);
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::PushMessage { .. }); assert_matches!(err, Error::PushMessage { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -351,10 +396,11 @@ mod tests {
let persistent_context = new_persistent_context(); let persistent_context = new_persistent_context();
let env = TestingEnv::new(); let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
ctx.volatile_ctx.metrics.operations_elapsed = ctx.volatile_ctx.metrics.operations_elapsed =
ctx.persistent_ctx.timeout + Duration::from_secs(1); ctx.persistent_ctx.timeout + Duration::from_secs(1);
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. }); assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -367,7 +413,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -379,7 +426,7 @@ mod tests {
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. }); assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
} }
@@ -391,7 +438,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -411,7 +459,7 @@ mod tests {
)) ))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. }); assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable()); assert!(err.is_retryable());
@@ -425,7 +473,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -439,7 +488,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, true, false, None)) Ok(new_upgrade_region_reply(id, true, false, None))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. }); assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -457,7 +506,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -471,7 +521,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, false, true, None)) Ok(new_upgrade_region_reply(id, false, true, None))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. }); assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable()); assert!(err.is_retryable());
@@ -491,7 +541,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, false, true, None)) Ok(new_upgrade_region_reply(id, false, true, None))
}); });
state.upgrade_region(&ctx).await.unwrap(); state.upgrade_region(&mut ctx).await.unwrap();
} }
#[tokio::test] #[tokio::test]
@@ -503,6 +553,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -563,6 +614,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -621,6 +673,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
ctx.volatile_ctx.metrics.operations_elapsed = ctx.volatile_ctx.metrics.operations_elapsed =

View File

@@ -29,7 +29,6 @@ use common_time::util::current_time_millis;
use common_wal::config::kafka::common::{ use common_wal::config::kafka::common::{
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
}; };
use itertools::Itertools;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
@@ -223,23 +222,25 @@ impl RegionFlushTrigger {
&self, &self,
topic: &str, topic: &str,
region_ids: &[RegionId], region_ids: &[RegionId],
topic_regions: &HashMap<RegionId, TopicRegionValue>,
leader_regions: &HashMap<RegionId, LeaderRegion>, leader_regions: &HashMap<RegionId, LeaderRegion>,
) -> Result<()> { ) -> Result<()> {
let regions = region_ids let regions = region_ids
.iter() .iter()
.flat_map(|region_id| match leader_regions.get(region_id) { .flat_map(|region_id| match leader_regions.get(region_id) {
Some(leader_region) => { Some(leader_region) => should_persist_region_checkpoint(
let entry_id = leader_region.manifest.replay_entry_id(); leader_region,
let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id(); topic_regions
.get(region_id)
Some(( .cloned()
.and_then(|value| value.checkpoint),
)
.map(|checkpoint| {
(
TopicRegionKey::new(*region_id, topic), TopicRegionKey::new(*region_id, topic),
Some(TopicRegionValue::new(Some(ReplayCheckpoint::new( Some(TopicRegionValue::new(Some(checkpoint))),
entry_id, )
metadata_entry_id, }),
)))),
))
}
None => None, None => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@@ -272,14 +273,14 @@ impl RegionFlushTrigger {
latest_entry_id: u64, latest_entry_id: u64,
avg_record_size: usize, avg_record_size: usize,
) -> Result<()> { ) -> Result<()> {
let region_ids = self let topic_regions = self
.table_metadata_manager .table_metadata_manager
.topic_region_manager() .topic_region_manager()
.regions(topic) .regions(topic)
.await .await
.context(error::TableMetadataManagerSnafu)?; .context(error::TableMetadataManagerSnafu)?;
if region_ids.is_empty() { if topic_regions.is_empty() {
debug!("No regions found for topic: {}", topic); debug!("No regions found for topic: {}", topic);
return Ok(()); return Ok(());
} }
@@ -287,7 +288,7 @@ impl RegionFlushTrigger {
// Filters regions need to persist checkpoints. // Filters regions need to persist checkpoints.
let regions_to_persist = filter_regions_by_replay_size( let regions_to_persist = filter_regions_by_replay_size(
topic, topic,
region_ids topic_regions
.iter() .iter()
.map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())), .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
avg_record_size as u64, avg_record_size as u64,
@@ -296,33 +297,25 @@ impl RegionFlushTrigger {
); );
let region_manifests = self let region_manifests = self
.leader_region_registry .leader_region_registry
.batch_get(region_ids.keys().cloned()); .batch_get(topic_regions.keys().cloned());
if let Err(err) = self if let Err(err) = self
.persist_region_checkpoints(topic, &regions_to_persist, &region_manifests) .persist_region_checkpoints(
topic,
&regions_to_persist,
&topic_regions,
&region_manifests,
)
.await .await
{ {
error!(err; "Failed to persist region checkpoints for topic: {}", topic); error!(err; "Failed to persist region checkpoints for topic: {}", topic);
} }
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests let regions = region_manifests
.into_iter() .into_iter()
.partition_map(|(region_id, region)| { .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
if !region.manifest.is_inactive() { .collect::<Vec<_>>();
itertools::Either::Left((region_id, region.manifest.prunable_entry_id())) let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
} else {
itertools::Either::Right((region_id, region.manifest.prunable_entry_id()))
}
});
let min_entry_id = inactive_regions
.iter()
.min_by_key(|(_, entry_id)| *entry_id);
let min_entry_id = active_regions
.iter()
.min_by_key(|(_, entry_id)| *entry_id)
.or(min_entry_id);
if let Some((_, min_entry_id)) = min_entry_id { if let Some((_, min_entry_id)) = min_entry_id {
let replay_size = (latest_entry_id.saturating_sub(*min_entry_id)) let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
.saturating_mul(avg_record_size as u64); .saturating_mul(avg_record_size as u64);
@@ -332,45 +325,28 @@ impl RegionFlushTrigger {
} }
// Selects regions to flush from the set of active regions. // Selects regions to flush from the set of active regions.
let mut regions_to_flush = filter_regions_by_replay_size( let regions_to_flush = filter_regions_by_replay_size(
topic, topic,
active_regions.into_iter(), regions.into_iter(),
avg_record_size as u64, avg_record_size as u64,
latest_entry_id, latest_entry_id,
self.flush_trigger_size, self.flush_trigger_size,
); );
let active_regions_num = regions_to_flush.len();
// Selects regions to flush from the set of inactive regions.
// For inactive regions, we use a lower flush trigger size (half of the normal size)
// to encourage more aggressive flushing to update the region's topic latest entry id.
let inactive_regions_to_flush = filter_regions_by_replay_size(
topic,
inactive_regions.into_iter(),
avg_record_size as u64,
latest_entry_id,
self.flush_trigger_size / 2,
);
let inactive_regions_num = inactive_regions_to_flush.len();
regions_to_flush.extend(inactive_regions_to_flush);
// Sends flush instructions to datanodes. // Sends flush instructions to datanodes.
if !regions_to_flush.is_empty() { if !regions_to_flush.is_empty() {
self.send_flush_instructions(&regions_to_flush).await?; self.send_flush_instructions(&regions_to_flush).await?;
debug!( debug!(
"Sent {} flush instructions to datanodes for topic: '{}' ({} inactive regions)", "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
regions_to_flush.len(), regions_to_flush.len(),
topic, topic,
inactive_regions_num, regions_to_flush,
); );
} }
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "active"]) .with_label_values(&[topic])
.inc_by(active_regions_num as u64); .inc_by(regions_to_flush.len() as u64);
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "inactive"])
.inc_by(inactive_regions_num as u64);
Ok(()) Ok(())
} }
@@ -409,6 +385,26 @@ impl RegionFlushTrigger {
} }
} }
/// Determines whether a region checkpoint should be persisted based on current and persisted state.
fn should_persist_region_checkpoint(
current: &LeaderRegion,
persisted: Option<ReplayCheckpoint>,
) -> Option<ReplayCheckpoint> {
let new_checkpoint = ReplayCheckpoint::new(
current.manifest.replay_entry_id(),
current.manifest.metadata_replay_entry_id(),
);
let Some(persisted) = persisted else {
return Some(new_checkpoint);
};
if new_checkpoint > persisted {
return Some(new_checkpoint);
}
None
}
/// Filter regions based on the estimated replay size. /// Filter regions based on the estimated replay size.
/// ///
/// Returns the regions if its estimated replay size exceeds the given threshold. /// Returns the regions if its estimated replay size exceeds the given threshold.
@@ -497,6 +493,7 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_meta::region_registry::LeaderRegionManifestInfo;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use super::*; use super::*;
@@ -627,4 +624,92 @@ mod tests {
// Only regions 1,1 and 1,2 should be flushed // Only regions 1,1 and 1,2 should be flushed
assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]); assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
} }
fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Metric {
data_manifest_version: 1,
data_flushed_entry_id: replay_entry_id,
data_topic_latest_entry_id: 0,
metadata_manifest_version: 1,
metadata_flushed_entry_id: metadata_replay_entry_id,
metadata_topic_latest_entry_id: 0,
},
}
}
fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: replay_entry_id,
topic_latest_entry_id: 0,
},
}
}
#[test]
fn test_should_persist_region_checkpoint() {
// `persisted` is none
let current = metric_leader_region(100, 10);
let result = should_persist_region_checkpoint(&current, None).unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, None));
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.metadata_entry_id` is none
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `current.manifest.metadata_replay_entry_id()` is none
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
.is_none();
assert!(result);
// `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
assert!(result.is_none());
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
assert!(result.is_none());
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
// `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
let current = metric_leader_region(80, 11);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
assert!(result.is_none());
let current = mito_leader_region(80);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
assert!(result.is_none());
}
} }

View File

@@ -15,7 +15,9 @@
use common_telemetry::debug; use common_telemetry::debug;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngine; use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest}; use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use crate::engine::MetricEngineInner; use crate::engine::MetricEngineInner;
@@ -59,6 +61,10 @@ impl MetricEngineInner {
entry_id: req.metadata_entry_id, entry_id: req.metadata_entry_id,
metadata_entry_id: None, metadata_entry_id: None,
location_id: req.location_id, location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
}), }),
) )
.await .await
@@ -73,6 +79,10 @@ impl MetricEngineInner {
entry_id: req.entry_id, entry_id: req.entry_id,
metadata_entry_id: None, metadata_entry_id: None,
location_id: req.location_id, location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.entry_id,
metadata_entry_id: None,
}),
}), }),
) )
.await .await

View File

@@ -1077,6 +1077,7 @@ mod tests {
let staging_manifest_ctx = { let staging_manifest_ctx = {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(), version_control.current().version.metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(), object_store: env.access_layer.object_store().clone(),

View File

@@ -127,8 +127,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -160,8 +159,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -251,8 +249,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: incorrect_last_entry_id, entry_id: incorrect_last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await .await
@@ -269,8 +266,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: incorrect_last_entry_id, entry_id: incorrect_last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -340,9 +336,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -372,9 +366,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -465,9 +457,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -503,9 +493,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -652,9 +640,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -715,9 +701,7 @@ async fn test_catchup_not_exist() {
non_exist_region_id, non_exist_region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await .await

View File

@@ -27,8 +27,8 @@ use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
}; };
use crate::manifest::action::{ use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
RegionMetaActionList, RegionMetaAction, RegionMetaActionList,
}; };
use crate::manifest::checkpointer::Checkpointer; use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{ use crate::manifest::storage::{
@@ -150,6 +150,7 @@ impl RegionManifestManager {
/// Constructs a region's manifest and persist it. /// Constructs a region's manifest and persist it.
pub async fn new( pub async fn new(
metadata: RegionMetadataRef, metadata: RegionMetadataRef,
flushed_entry_id: u64,
options: RegionManifestOptions, options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>, total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>, manifest_version: Arc<AtomicU64>,
@@ -163,8 +164,8 @@ impl RegionManifestManager {
); );
info!( info!(
"Creating region manifest in {} with metadata {:?}", "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
options.manifest_dir, metadata options.manifest_dir, metadata, flushed_entry_id
); );
let version = MIN_VERSION; let version = MIN_VERSION;
@@ -184,9 +185,21 @@ impl RegionManifestManager {
options.manifest_dir, manifest options.manifest_dir, manifest
); );
let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
if flushed_entry_id > 0 {
actions.push(RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: Some(flushed_entry_id),
flushed_sequence: None,
}));
}
// Persist region change. // Persist region change.
let action_list = let action_list = RegionMetaActionList::new(actions);
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
// New region is not in staging mode. // New region is not in staging mode.
// TODO(ruihang): add staging mode support if needed. // TODO(ruihang): add staging mode support if needed.
store.save(version, &action_list.encode()?, false).await?; store.save(version, &action_list.encode()?, false).await?;

View File

@@ -1122,6 +1122,7 @@ mod tests {
let staging_ctx = { let staging_ctx = {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(), version_control.current().version.metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(), object_store: env.access_layer.object_store().clone(),
@@ -1187,6 +1188,7 @@ mod tests {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
metadata.clone(), metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: access_layer.object_store().clone(), object_store: access_layer.object_store().clone(),

View File

@@ -238,8 +238,11 @@ impl RegionOpener {
// Create a manifest manager for this region and writes regions to the manifest file. // Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = let region_manifest_options =
Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?; Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
let manifest_manager = RegionManifestManager::new( let manifest_manager = RegionManifestManager::new(
metadata.clone(), metadata.clone(),
flushed_entry_id,
region_manifest_options, region_manifest_options,
self.stats.total_manifest_size.clone(), self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(), self.stats.manifest_version.clone(),
@@ -439,7 +442,7 @@ impl RegionOpener {
.build(); .build();
let flushed_entry_id = version.flushed_entry_id; let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version)); let version_control = Arc::new(VersionControl::new(version));
if !self.skip_wal_replay { let topic_latest_entry_id = if !self.skip_wal_replay {
let replay_from_entry_id = self let replay_from_entry_id = self
.replay_checkpoint .replay_checkpoint
.unwrap_or_default() .unwrap_or_default()
@@ -461,14 +464,26 @@ impl RegionOpener {
on_region_opened, on_region_opened,
) )
.await?; .await?;
// For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
// Only set after the WAL replay is completed.
let topic_latest_entry_id = if provider.is_remote_wal()
&& version_control.current().version.memtables.is_empty()
{
wal.store().latest_entry_id(&provider).unwrap_or(0)
} else {
0
};
topic_latest_entry_id
} else { } else {
info!( info!(
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}", "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
region_id, manifest.manifest_version, flushed_entry_id region_id, manifest.manifest_version, flushed_entry_id
); );
}
let now = self.time_provider.current_time_millis();
0
};
let now = self.time_provider.current_time_millis();
let region = MitoRegion { let region = MitoRegion {
region_id: self.region_id, region_id: self.region_id,
version_control, version_control,
@@ -483,7 +498,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(now), last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now), last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(), time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0), topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
write_bytes: Arc::new(AtomicU64::new(0)), write_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder, memtable_builder,
stats: self.stats.clone(), stats: self.stats.clone(),
@@ -713,8 +728,8 @@ where
let series_count = version_control.current().series_count(); let series_count = version_control.current().series_count();
info!( info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}", "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
region_id, rows_replayed, last_entry_id, series_count, now.elapsed() region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed()
); );
Ok(last_entry_id) Ok(last_entry_id)
} }

View File

@@ -563,6 +563,7 @@ impl TestEnv {
if let Some(metadata) = initial_metadata { if let Some(metadata) = initial_metadata {
RegionManifestManager::new( RegionManifestManager::new(
metadata, metadata,
0,
manifest_opts, manifest_opts,
Default::default(), Default::default(),
Default::default(), Default::default(),

View File

@@ -116,6 +116,7 @@ impl SchedulerEnv {
Arc::new(ManifestContext::new( Arc::new(ManifestContext::new(
RegionManifestManager::new( RegionManifestManager::new(
metadata, metadata,
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: self.access_layer.object_store().clone(), object_store: self.access_layer.object_store().clone(),

View File

@@ -65,7 +65,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if region.provider.is_remote_wal() { if region.provider.is_remote_wal() {
let flushed_entry_id = region.version_control.current().last_entry_id; let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); let replay_from_entry_id = request
.checkpoint
.map(|c| c.entry_id)
.unwrap_or_default()
.max(flushed_entry_id);
info!("Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", region.provider);
let timer = Instant::now(); let timer = Instant::now();
let wal_entry_reader = let wal_entry_reader =
self.wal self.wal
@@ -75,15 +80,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.provider, &region.provider,
wal_entry_reader, wal_entry_reader,
region_id, region_id,
flushed_entry_id, replay_from_entry_id,
&region.version_control, &region.version_control,
self.config.allow_stale_entries, self.config.allow_stale_entries,
on_region_opened, on_region_opened,
) )
.await?; .await?;
info!( info!(
"Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.", "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
timer.elapsed(), timer.elapsed(),
region.provider,
request.entry_id request.entry_id
); );
if let Some(expected_last_entry_id) = request.entry_id { if let Some(expected_last_entry_id) = request.entry_id {

View File

@@ -15,6 +15,7 @@
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use crate::logstore::LogStore;
use crate::storage::RegionId; use crate::storage::RegionId;
// The Provider of kafka log store // The Provider of kafka log store
@@ -78,6 +79,18 @@ impl Display for Provider {
} }
impl Provider { impl Provider {
/// Returns the initial flushed entry id of the provider.
/// This is used to initialize the flushed entry id of the region when creating the region from scratch.
///
/// Currently only used for remote WAL.
/// For local WAL, the initial flushed entry id is 0.
pub fn initial_flushed_entry_id<S: LogStore>(&self, wal: &S) -> u64 {
if matches!(self, Provider::Kafka(_)) {
return wal.latest_entry_id(self).unwrap_or(0);
}
0
}
pub fn raft_engine_provider(id: u64) -> Provider { pub fn raft_engine_provider(id: u64) -> Provider {
Provider::RaftEngine(RaftEngineProvider { id }) Provider::RaftEngine(RaftEngineProvider { id })
} }

View File

@@ -1358,7 +1358,7 @@ pub enum RegionTruncateRequest {
/// ///
/// Makes a readonly region to catch up to leader region changes. /// Makes a readonly region to catch up to leader region changes.
/// There is no effect if it operating on a leader region. /// There is no effect if it operating on a leader region.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, Default)]
pub struct RegionCatchupRequest { pub struct RegionCatchupRequest {
/// Sets it to writable if it's available after it has caught up with all changes. /// Sets it to writable if it's available after it has caught up with all changes.
pub set_writable: bool, pub set_writable: bool,
@@ -1371,6 +1371,8 @@ pub struct RegionCatchupRequest {
pub metadata_entry_id: Option<entry::Id>, pub metadata_entry_id: Option<entry::Id>,
/// The hint for replaying memtable. /// The hint for replaying memtable.
pub location_id: Option<u64>, pub location_id: Option<u64>,
/// Replay checkpoint.
pub checkpoint: Option<ReplayCheckpoint>,
} }
/// Get sequences of regions by region ids. /// Get sequences of regions by region ids.

View File

@@ -22,15 +22,17 @@ INSERT INTO test VALUES
Affected Rows: 3 Affected Rows: 3
-- SQLNESS SLEEP 3s -- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) -- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | | sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| 3 | 2699 | 0 | 0 | | 3 | 78 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

View File

@@ -17,7 +17,9 @@ INSERT INTO test VALUES
(21, 'c', 21); (21, 'c', 21);
-- SQLNESS SLEEP 3s -- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) -- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');