From 566a647ec7e6ea8b55ae5ba5140dfb39985e6786 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 27 Aug 2025 15:24:33 +0800 Subject: [PATCH] feat: add replay checkpoint to reduce overhead for remote WAL (#6816) * feat: introduce `TopicRegionValue` Signed-off-by: WenyXu * feat: persist region replay checkpoint Signed-off-by: WenyXu * feat: introduce checkpoint Signed-off-by: WenyXu * chore: udpate config.md Signed-off-by: WenyXu * refactor: minor refactor Signed-off-by: WenyXu * feat: send open region instructions with reply checkpoint Signed-off-by: WenyXu * chore: use usize Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu * feat: add topic name pattern Signed-off-by: WenyXu * feat: enable wal prune by default Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 2 +- Cargo.toml | 2 +- config/config.md | 3 +- config/metasrv.example.toml | 10 +- src/common/meta/src/instruction.rs | 20 ++ src/common/meta/src/key.rs | 17 +- src/common/meta/src/key/topic_region.rs | 179 +++++++++++++---- src/common/meta/src/region_registry.rs | 28 +++ src/common/meta/src/wal_options_allocator.rs | 24 ++- src/common/wal/src/config.rs | 8 +- src/common/wal/src/config/kafka/common.rs | 4 +- src/common/wal/src/config/kafka/metasrv.rs | 5 +- src/common/wal/src/lib.rs | 1 + src/datanode/src/datanode.rs | 145 +++----------- .../src/heartbeat/handler/open_region.rs | 16 +- src/datanode/src/lib.rs | 1 + src/datanode/src/region_server.rs | 5 + src/datanode/src/utils.rs | 188 ++++++++++++++++++ src/file-engine/src/region.rs | 2 + src/log-store/src/kafka/consumer.rs | 86 ++++++-- src/log-store/src/kafka/log_store.rs | 20 +- src/meta-srv/src/metasrv/builder.rs | 1 + src/meta-srv/src/metrics.rs | 9 +- .../src/procedure/region_migration.rs | 15 ++ .../region_migration/open_candidate_region.rs | 41 ++-- src/meta-srv/src/procedure/wal_prune.rs | 6 +- src/meta-srv/src/procedure/wal_prune/utils.rs | 4 +- src/meta-srv/src/region/flush_trigger.rs | 148 ++++++++++++-- src/metric-engine/src/engine.rs | 4 + src/metric-engine/src/engine/open.rs | 38 ++-- src/metric-engine/src/test_util.rs | 1 + src/mito2/src/engine/alter_test.rs | 4 + src/mito2/src/engine/basic_test.rs | 1 + src/mito2/src/engine/batch_open_test.rs | 3 + src/mito2/src/engine/catchup_test.rs | 6 + src/mito2/src/engine/compaction_test.rs | 2 + src/mito2/src/engine/open_test.rs | 8 + src/mito2/src/engine/parallel_test.rs | 1 + src/mito2/src/engine/sync_test.rs | 2 + src/mito2/src/engine/truncate_test.rs | 2 + src/mito2/src/region/opener.rs | 21 +- src/mito2/src/test_util.rs | 1 + src/mito2/src/worker/handle_open.rs | 1 + src/store-api/src/region_request.rs | 9 + 44 files changed, 850 insertions(+), 244 deletions(-) create mode 100644 src/datanode/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index dc1efc1804..c86a7f77a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10777,7 +10777,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.6.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=a62120b6c74d68953464b256f858dc1c41a903b4#a62120b6c74d68953464b256f858dc1c41a903b4" +source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76" dependencies = [ "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index f8016a83cb..a2e8d368e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,7 +191,7 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "a62120b6c74d68953464b256f858dc1c41a903b4", features = [ +rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [ "transport-tls", ] } rstest = "0.25" diff --git a/config/config.md b/config/config.md index fcd23a2b5f..7b3f4745b3 100644 --- a/config/config.md +++ b/config/config.md @@ -379,8 +379,9 @@ | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster.

**It's only used when the provider is `kafka`**. | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
**It's only used when the provider is `kafka`**. | -| `wal.auto_prune_interval` | String | `10m` | Interval of automatically WAL pruning.
Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
**It's only used when the provider is `kafka`**. | +| `wal.auto_prune_interval` | String | `30m` | Interval of automatically WAL pruning.
Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
**It's only used when the provider is `kafka`**. | | `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL.
Since multiple regions may share a Kafka topic, the estimated size is calculated as:
(latest_entry_id - flushed_entry_id) * avg_record_size
MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.
- `latest_entry_id`: The latest entry ID in the topic.
- `flushed_entry_id`: The last flushed entry ID for the region.
Set to "0" to let the system decide the flush trigger size.
**It's only used when the provider is `kafka`**. | +| `wal.checkpoint_trigger_size` | String | `128MB` | Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.
The estimated size is calculated as:
(latest_entry_id - last_checkpoint_entry_id) * avg_record_size
MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.
Set to "0" to disable checkpoint trigger.
**It's only used when the provider is `kafka`**. | | `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning.
**It's only used when the provider is `kafka`**. | | `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index fc9b8a8ef4..a7f53eea49 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -190,7 +190,7 @@ auto_create_topics = true ## Interval of automatically WAL pruning. ## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. ## **It's only used when the provider is `kafka`**. -auto_prune_interval = "10m" +auto_prune_interval = "30m" ## Estimated size threshold to trigger a flush when using Kafka remote WAL. @@ -203,6 +203,14 @@ auto_prune_interval = "10m" ## **It's only used when the provider is `kafka`**. flush_trigger_size = "512MB" +## Estimated size threshold to trigger a checkpoint when using Kafka remote WAL. +## The estimated size is calculated as: +## (latest_entry_id - last_checkpoint_entry_id) * avg_record_size +## MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`. +## Set to "0" to disable checkpoint trigger. +## **It's only used when the provider is `kafka`**. +checkpoint_trigger_size = "128MB" + ## Concurrent task limit for automatically WAL pruning. ## **It's only used when the provider is `kafka`**. auto_prune_parallelism = 10 diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 1d2259310c..b74b8c25b4 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -108,6 +108,10 @@ pub struct OpenRegion { pub region_wal_options: HashMap, #[serde(default)] pub skip_wal_replay: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replay_entry_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metadata_replay_entry_id: Option, } impl OpenRegion { @@ -124,8 +128,22 @@ impl OpenRegion { region_options, region_wal_options, skip_wal_replay, + replay_entry_id: None, + metadata_replay_entry_id: None, } } + + /// Sets the replay entry id. + pub fn with_replay_entry_id(mut self, replay_entry_id: Option) -> Self { + self.replay_entry_id = replay_entry_id; + self + } + + /// Sets the metadata replay entry id. + pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option) -> Self { + self.metadata_replay_entry_id = metadata_replay_entry_id; + self + } } /// The instruction of downgrading leader region. @@ -352,6 +370,8 @@ mod tests { region_options, region_wal_options: HashMap::new(), skip_wal_replay: false, + replay_entry_id: None, + metadata_replay_entry_id: None, }; assert_eq!(expected, deserialized); } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 7cf1b0bef8..3688091843 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -155,6 +155,7 @@ use crate::error::{self, Result, SerdeJsonSnafu}; use crate::key::flow::flow_state::FlowStateValue; use crate::key::node_address::NodeAddressValue; use crate::key::table_route::TableRouteKey; +use crate::key::topic_region::TopicRegionValue; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -164,6 +165,7 @@ use crate::state_store::PoisonValue; use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; +pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*"; pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance"; pub const MAINTENANCE_KEY: &str = "__switches/maintenance"; pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure"; @@ -271,6 +273,10 @@ lazy_static! { pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap(); } +lazy_static! { + pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap(); +} + lazy_static! { static ref TABLE_INFO_KEY_PATTERN: Regex = Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); @@ -326,7 +332,7 @@ lazy_static! { lazy_static! { pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!( - "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$" + "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$" )) .unwrap(); } @@ -1434,7 +1440,8 @@ impl_metadata_value! { NodeAddressValue, SchemaNameValue, FlowStateValue, - PoisonValue + PoisonValue, + TopicRegionValue } impl_optional_metadata_value! { @@ -1676,9 +1683,11 @@ mod tests { .topic_region_manager .regions(&topic) .await - .unwrap(); + .unwrap() + .into_keys() + .collect::>(); assert_eq!(regions.len(), 8); - assert_eq!(regions[0], region_id); + assert!(regions.contains(®ion_id)); } } diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index 51c90d69ea..776d0fceca 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -12,20 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - use std::collections::HashMap; use std::fmt::{self, Display}; @@ -37,10 +23,12 @@ use table::metadata::TableId; use crate::ddl::utils::parse_region_wal_options; use crate::error::{Error, InvalidMetadataSnafu, Result}; -use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX}; +use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest}; +use crate::rpc::store::{ + BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest, +}; use crate::rpc::KeyValue; // The TopicRegionKey is a key for the topic-region mapping in the kvbackend. @@ -51,8 +39,20 @@ pub struct TopicRegionKey<'a> { pub topic: &'a str, } -#[derive(Debug, Serialize, Deserialize)] -pub struct TopicRegionValue; +/// Represents additional information for a region when using a shared WAL. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +pub struct TopicRegionValue { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub checkpoint: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +pub struct ReplayCheckpoint { + #[serde(default)] + pub entry_id: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metadata_entry_id: Option, +} impl<'a> TopicRegionKey<'a> { pub fn new(region_id: RegionId, topic: &'a str) -> Self { @@ -118,9 +118,47 @@ impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> { } } -fn topic_region_decoder(value: &KeyValue) -> Result> { +impl ReplayCheckpoint { + /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id. + pub fn new(entry_id: u64, metadata_entry_id: Option) -> Self { + Self { + entry_id, + metadata_entry_id, + } + } +} + +impl TopicRegionValue { + /// Creates a new [`TopicRegionValue`] with the given checkpoint. + pub fn new(checkpoint: Option) -> Self { + Self { checkpoint } + } + + /// Returns the minimum entry id of the region. + /// + /// If the metadata entry id is not set, it returns the entry id. + pub fn min_entry_id(&self) -> Option { + match self.checkpoint { + Some(ReplayCheckpoint { + entry_id, + metadata_entry_id, + }) => match metadata_entry_id { + Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)), + None => Some(entry_id), + }, + None => None, + } + } +} + +fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> { let key = TopicRegionKey::from_bytes(&value.key)?; - Ok(key) + let value = if value.value.is_empty() { + TopicRegionValue::default() + } else { + TopicRegionValue::try_from_raw_value(&value.value)? + }; + Ok((key, value)) } /// Manages map of topics and regions in kvbackend. @@ -143,21 +181,59 @@ impl TopicRegionManager { Ok(()) } - pub async fn batch_put(&self, keys: Vec>) -> Result<()> { + pub async fn batch_get( + &self, + keys: Vec>, + ) -> Result> { + let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::>(); + let req = BatchGetRequest { keys: raw_keys }; + let resp = self.kv_backend.batch_get(req).await?; + + let v = resp + .kvs + .into_iter() + .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value))) + .collect::>>()?; + + Ok(v) + } + + pub async fn get(&self, key: TopicRegionKey<'_>) -> Result> { + let key_bytes = key.to_bytes(); + let resp = self.kv_backend.get(&key_bytes).await?; + let value = resp + .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value)) + .transpose()?; + + Ok(value) + } + + pub async fn batch_put( + &self, + keys: &[(TopicRegionKey<'_>, Option)], + ) -> Result<()> { let req = BatchPutRequest { kvs: keys - .into_iter() - .map(|key| KeyValue { - key: key.to_bytes(), - value: vec![], + .iter() + .map(|(key, value)| { + let value = value + .map(|v| v.try_as_raw_value()) + .transpose()? + .unwrap_or_default(); + + Ok(KeyValue { + key: key.to_bytes(), + value, + }) }) - .collect(), + .collect::>>()?, prev_kv: false, }; self.kv_backend.batch_put(req).await?; Ok(()) } + /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes. pub fn build_create_txn( &self, table_id: TableId, @@ -176,8 +252,8 @@ impl TopicRegionManager { Ok(Txn::new().and_then(operations)) } - /// Returns the list of region ids using specified topic. - pub async fn regions(&self, topic: &str) -> Result> { + /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`]. + pub async fn regions(&self, topic: &str) -> Result> { let prefix = TopicRegionKey::range_topic_key(topic); let req = RangeRequest::new().with_prefix(prefix.as_bytes()); let resp = self.kv_backend.range(req).await?; @@ -186,7 +262,10 @@ impl TopicRegionManager { .iter() .map(topic_region_decoder) .collect::>>()?; - Ok(region_ids.iter().map(|key| key.region_id).collect()) + Ok(region_ids + .into_iter() + .map(|(key, value)| (key.region_id, value)) + .collect()) } pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> { @@ -248,15 +327,24 @@ mod tests { let topics = (0..16).map(|i| format!("topic_{}", i)).collect::>(); let keys = (0..64) - .map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize])) + .map(|i| { + ( + TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]), + None, + ) + }) .collect::>(); - manager.batch_put(keys.clone()).await.unwrap(); - - let mut key_values = manager.regions(&topics[0]).await.unwrap(); + manager.batch_put(&keys).await.unwrap(); + let mut key_values = manager + .regions(&topics[0]) + .await + .unwrap() + .into_keys() + .collect::>(); let expected = keys .iter() - .filter_map(|key| { + .filter_map(|(key, _)| { if key.topic == topics[0] { Some(key.region_id) } else { @@ -269,10 +357,15 @@ mod tests { let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0"); manager.delete(key.clone()).await.unwrap(); - let mut key_values = manager.regions(&topics[0]).await.unwrap(); + let mut key_values = manager + .regions(&topics[0]) + .await + .unwrap() + .into_keys() + .collect::>(); let expected = keys .iter() - .filter_map(|key| { + .filter_map(|(key, _)| { if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) { Some(key.region_id) } else { @@ -324,4 +417,18 @@ mod tests { expected.sort_by_key(|(region_id, _)| region_id.as_u64()); assert_eq!(topic_region_map, expected); } + + #[test] + fn test_topic_region_key_is_match() { + let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992"; + let topic_region_key = TopicRegionKey::try_from(key).unwrap(); + assert_eq!( + topic_region_key.topic, + "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2" + ); + assert_eq!( + topic_region_key.region_id, + RegionId::from_u64(4410931412992) + ); + } } diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 83834937dc..b579c583bf 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -133,6 +133,34 @@ impl LeaderRegionManifestInfo { } } + /// Returns the replay entry id of the data region. + pub fn replay_entry_id(&self) -> u64 { + match self { + LeaderRegionManifestInfo::Mito { + flushed_entry_id, + topic_latest_entry_id, + .. + } => (*flushed_entry_id).max(*topic_latest_entry_id), + LeaderRegionManifestInfo::Metric { + data_flushed_entry_id, + data_topic_latest_entry_id, + .. + } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id), + } + } + + /// Returns the replay entry id of the metadata region. + pub fn metadata_replay_entry_id(&self) -> Option { + match self { + LeaderRegionManifestInfo::Metric { + metadata_flushed_entry_id, + metadata_topic_latest_entry_id, + .. + } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)), + _ => None, + } + } + /// A region is considered inactive if the flushed entry id is less than the topic's latest entry id. /// /// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush. diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 9be4655dd7..fb631363f1 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -27,7 +27,7 @@ use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result}; -use crate::key::NAME_PATTERN_REGEX; +use crate::key::TOPIC_NAME_PATTERN_REGEX; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; pub use crate::wal_options_allocator::topic_creator::{ @@ -109,7 +109,7 @@ pub async fn build_wal_options_allocator( MetasrvWalConfig::Kafka(kafka_config) => { let prefix = &kafka_config.kafka_topic.topic_name_prefix; ensure!( - NAME_PATTERN_REGEX.is_match(prefix), + TOPIC_NAME_PATTERN_REGEX.is_match(prefix), InvalidTopicNamePrefixSnafu { prefix } ); let topic_creator = @@ -149,6 +149,26 @@ pub fn prepare_wal_options( } } +/// Extracts the topic from the wal options. +pub fn extract_topic_from_wal_options( + region_id: RegionId, + region_options: &HashMap, +) -> Option { + region_options + .get(®ion_id.region_number()) + .and_then(|wal_options| { + serde_json::from_str::(wal_options) + .ok() + .and_then(|wal_options| { + if let WalOptions::Kafka(kafka_wal_option) = wal_options { + Some(kafka_wal_option.topic) + } else { + None + } + }) + }) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 4bfcbc46e7..04b173060d 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -20,7 +20,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ - DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE, + DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE, + DEFAULT_FLUSH_TRIGGER_SIZE, }; use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; @@ -64,6 +65,8 @@ impl From for MetasrvWalConfig { auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM, // This field won't be used in standalone mode flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE, + // This field won't be used in standalone mode + checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE, }), } } @@ -205,9 +208,10 @@ mod tests { create_topic_timeout: Duration::from_secs(30), }, auto_create_topics: true, - auto_prune_interval: Duration::from_secs(0), + auto_prune_interval: Duration::from_mins(30), auto_prune_parallelism: 10, flush_trigger_size: ReadableSize::mb(512), + checkpoint_trigger_size: ReadableSize::mb(128), }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index 56145b06c9..1028ca838b 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -37,11 +37,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { }; /// Default interval for auto WAL pruning. -pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO; +pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30); /// Default limit for concurrent auto pruning tasks. pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10; /// Default size of WAL to trigger flush. pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512); +/// Default checkpoint trigger size. +pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128); use crate::error::{self, Result}; use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 4ed23e5a3b..bffde60c81 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL, - DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE, + DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, }; /// Kafka wal configurations for metasrv. @@ -41,6 +41,8 @@ pub struct MetasrvKafkaConfig { pub auto_prune_parallelism: usize, // The size of WAL to trigger flush. pub flush_trigger_size: ReadableSize, + // The checkpoint trigger size. + pub checkpoint_trigger_size: ReadableSize, } impl Default for MetasrvKafkaConfig { @@ -52,6 +54,7 @@ impl Default for MetasrvKafkaConfig { auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL, auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM, flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE, + checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE, } } } diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 659a045f57..49b16ea598 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(duration_constructors_lite)] use std::net::SocketAddr; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 181b18d937..bcd5effd86 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -23,18 +23,15 @@ use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; use common_meta::datanode::TopicStatsReporter; -use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::wal_options_allocator::prepare_wal_options; pub use common_procedure::options::ProcedureConfig; use common_telemetry::{error, info, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::DatanodeWalConfig; use file_engine::engine::FileRegionEngine; -use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; use log_store::kafka::{default_index_file, GlobalIndexCollector}; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -49,10 +46,8 @@ use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; use servers::server::ServerHandlers; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::path_utils::{table_dir, WAL_DIR}; +use store_api::path_utils::WAL_DIR; use store_api::region_engine::{RegionEngineRef, RegionRole}; -use store_api::region_request::{PathType, RegionOpenRequest}; -use store_api::storage::RegionId; use tokio::fs; use tokio::sync::Notify; @@ -70,6 +65,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; use crate::region_server::{DummyTableProviderFactory, RegionServer}; use crate::store::{self, new_object_store_without_cache}; +use crate::utils::{build_region_open_requests, RegionOpenRequests}; /// Datanode service. pub struct Datanode { @@ -252,16 +248,12 @@ impl DatanodeBuilder { .recovery_mode() .await .context(GetMetadataSnafu)?; - let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone()); - let table_values = datanode_table_manager - .tables(node_id) - .try_collect::>() - .await - .context(GetMetadataSnafu)?; + let region_open_requests = + build_region_open_requests(node_id, self.kv_backend.clone()).await?; let open_all_regions = open_all_regions( region_server.clone(), - table_values, + region_open_requests, !controlled_by_metasrv, self.opts.init_regions_parallelism, // Ignore nonexistent regions in recovery mode. @@ -342,27 +334,22 @@ impl DatanodeBuilder { async fn initialize_region_server( &self, region_server: &RegionServer, - kv_backend: KvBackendRef, open_with_writable: bool, ) -> Result<()> { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - let runtime_switch_manager = RuntimeSwitchManager::new(kv_backend.clone()); + // TODO(weny): Considering introducing a readonly kv_backend trait. + let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone()); let is_recovery_mode = runtime_switch_manager .recovery_mode() .await .context(GetMetadataSnafu)?; - - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); - let table_values = datanode_table_manager - .tables(node_id) - .try_collect::>() - .await - .context(GetMetadataSnafu)?; + let region_open_requests = + build_region_open_requests(node_id, self.kv_backend.clone()).await?; open_all_regions( region_server.clone(), - table_values, + region_open_requests, open_with_writable, self.opts.init_regions_parallelism, is_recovery_mode, @@ -609,73 +596,24 @@ impl DatanodeBuilder { /// Open all regions belong to this datanode. async fn open_all_regions( region_server: RegionServer, - table_values: Vec, + region_open_requests: RegionOpenRequests, open_with_writable: bool, init_regions_parallelism: usize, ignore_nonexistent_region: bool, ) -> Result<()> { - let mut regions = vec![]; - #[cfg(feature = "enterprise")] - let mut follower_regions = vec![]; - for table_value in table_values { - for region_number in table_value.regions { - // Augments region options with wal options if a wal options is provided. - let mut region_options = table_value.region_info.region_options.clone(); - prepare_wal_options( - &mut region_options, - RegionId::new(table_value.table_id, region_number), - &table_value.region_info.region_wal_options, - ); - - regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.region_info.engine.clone(), - table_value.region_info.region_storage_path.clone(), - region_options, - )); - } - + let RegionOpenRequests { + leader_regions, #[cfg(feature = "enterprise")] - for region_number in table_value.follower_regions { - // Augments region options with wal options if a wal options is provided. - let mut region_options = table_value.region_info.region_options.clone(); - prepare_wal_options( - &mut region_options, - RegionId::new(table_value.table_id, region_number), - &table_value.region_info.region_wal_options, - ); - - follower_regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.region_info.engine.clone(), - table_value.region_info.region_storage_path.clone(), - region_options, - )); - } - } - let num_regions = regions.len(); - info!("going to open {} region(s)", num_regions); - - let mut region_requests = Vec::with_capacity(regions.len()); - for (region_id, engine, store_path, options) in regions { - let table_dir = table_dir(&store_path, region_id.table_id()); - region_requests.push(( - region_id, - RegionOpenRequest { - engine, - table_dir, - path_type: PathType::Bare, - options, - skip_wal_replay: false, - }, - )); - } + follower_regions, + } = region_open_requests; + let leader_region_num = leader_regions.len(); + info!("going to open {} region(s)", leader_region_num); let now = Instant::now(); let open_regions = region_server .handle_batch_open_requests( init_regions_parallelism, - region_requests, + leader_regions, ignore_nonexistent_region, ) .await?; @@ -686,19 +624,19 @@ async fn open_all_regions( ); if !ignore_nonexistent_region { ensure!( - open_regions.len() == num_regions, + open_regions.len() == leader_region_num, error::UnexpectedSnafu { violated: format!( "Expected to open {} of regions, only {} of regions has opened", - num_regions, + leader_region_num, open_regions.len() ) } ); - } else if open_regions.len() != num_regions { + } else if open_regions.len() != leader_region_num { warn!( "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened", - num_regions, + leader_region_num, open_regions.len() ); } @@ -717,31 +655,14 @@ async fn open_all_regions( if !follower_regions.is_empty() { use tokio::time::Instant; - info!( - "going to open {} follower region(s)", - follower_regions.len() - ); - let mut region_requests = Vec::with_capacity(follower_regions.len()); - let num_regions = follower_regions.len(); - for (region_id, engine, store_path, options) in follower_regions { - let table_dir = table_dir(&store_path, region_id.table_id()); - region_requests.push(( - region_id, - RegionOpenRequest { - engine, - table_dir, - path_type: PathType::Bare, - options, - skip_wal_replay: true, - }, - )); - } + let follower_region_num = follower_regions.len(); + info!("going to open {} follower region(s)", follower_region_num); let now = Instant::now(); let open_regions = region_server .handle_batch_open_requests( init_regions_parallelism, - region_requests, + follower_regions, ignore_nonexistent_region, ) .await?; @@ -753,19 +674,19 @@ async fn open_all_regions( if !ignore_nonexistent_region { ensure!( - open_regions.len() == num_regions, + open_regions.len() == follower_region_num, error::UnexpectedSnafu { violated: format!( "Expected to open {} of follower regions, only {} of regions has opened", - num_regions, + follower_region_num, open_regions.len() ) } ); - } else if open_regions.len() != num_regions { + } else if open_regions.len() != follower_region_num { warn!( "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened", - num_regions, + follower_region_num, open_regions.len() ); } @@ -835,15 +756,13 @@ mod tests { ..Default::default() }, Plugins::default(), - kv_backend, + kv_backend.clone(), ); builder.with_cache_registry(layered_cache_registry); - - let kv = Arc::new(MemoryKvBackend::default()) as _; - setup_table_datanode(&kv).await; + setup_table_datanode(&(kv_backend as _)).await; builder - .initialize_region_server(&mock_region_server, kv.clone(), false) + .initialize_region_server(&mock_region_server, false) .await .unwrap(); diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 4a6952daf8..871bc5b9c3 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_options_allocator::prepare_wal_options; use futures_util::future::BoxFuture; use store_api::path_utils::table_dir; -use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint}; use crate::heartbeat::handler::HandlerContext; @@ -29,17 +29,31 @@ impl HandlerContext { mut region_options, region_wal_options, skip_wal_replay, + replay_entry_id, + metadata_replay_entry_id, }: OpenRegion, ) -> BoxFuture<'static, Option> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); + let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { + (Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint { + entry_id: replay_entry_id, + metadata_entry_id: Some(metadata_replay_entry_id), + }), + (Some(replay_entry_id), None) => Some(ReplayCheckpoint { + entry_id: replay_entry_id, + metadata_entry_id: None, + }), + _ => None, + }; let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, table_dir: table_dir(®ion_storage_path, region_id.table_id()), path_type: PathType::Bare, options: region_options, skip_wal_replay, + checkpoint, }); let result = self.region_server.handle_request(region_id, request).await; let success = result.is_ok(); diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index cdb67d836f..bc63d71c8a 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -28,3 +28,4 @@ pub mod service; pub mod store; #[cfg(any(test, feature = "testing"))] pub mod tests; +pub mod utils; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 882d1b2e92..0859fae5ca 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1410,6 +1410,7 @@ mod tests { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -1579,6 +1580,7 @@ mod tests { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }, ), ( @@ -1589,6 +1591,7 @@ mod tests { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }, ), ], @@ -1610,6 +1613,7 @@ mod tests { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }, ), ( @@ -1620,6 +1624,7 @@ mod tests { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }, ), ], diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs new file mode 100644 index 0000000000..77b9246c0f --- /dev/null +++ b/src/datanode/src/utils.rs @@ -0,0 +1,188 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options}; +use common_meta::DatanodeId; +use futures::TryStreamExt; +use snafu::ResultExt; +use store_api::path_utils::table_dir; +use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint}; +use store_api::storage::{RegionId, RegionNumber}; +use tracing::info; + +use crate::error::{GetMetadataSnafu, Result}; + +/// The requests to open regions. +pub(crate) struct RegionOpenRequests { + pub leader_regions: Vec<(RegionId, RegionOpenRequest)>, + #[cfg(feature = "enterprise")] + pub follower_regions: Vec<(RegionId, RegionOpenRequest)>, +} + +fn group_region_by_topic( + region_id: RegionId, + region_options: &HashMap, + topic_regions: &mut HashMap>, +) { + if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) { + topic_regions.entry(topic).or_default().push(region_id); + } +} + +fn get_replay_checkpoint( + region_id: RegionId, + topic_region_values: &Option>, +) -> Option { + let topic_region_values = topic_region_values.as_ref()?; + let topic_region_value = topic_region_values.get(®ion_id); + let replay_checkpoint = topic_region_value.and_then(|value| value.checkpoint); + replay_checkpoint.map(|checkpoint| ReplayCheckpoint { + entry_id: checkpoint.entry_id, + metadata_entry_id: checkpoint.metadata_entry_id, + }) +} + +pub(crate) async fn build_region_open_requests( + node_id: DatanodeId, + kv_backend: KvBackendRef, +) -> Result { + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let table_values = datanode_table_manager + .tables(node_id) + .try_collect::>() + .await + .context(GetMetadataSnafu)?; + + let topic_region_manager = TopicRegionManager::new(kv_backend); + let mut topic_regions = HashMap::>::new(); + let mut regions = vec![]; + #[cfg(feature = "enterprise")] + let mut follower_regions = vec![]; + + for table_value in table_values { + for region_number in table_value.regions { + let region_id = RegionId::new(table_value.table_id, region_number); + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + prepare_wal_options( + &mut region_options, + region_id, + &table_value.region_info.region_wal_options, + ); + group_region_by_topic( + region_id, + &table_value.region_info.region_wal_options, + &mut topic_regions, + ); + + regions.push(( + region_id, + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + region_options, + )); + } + + #[cfg(feature = "enterprise")] + for region_number in table_value.follower_regions { + let region_id = RegionId::new(table_value.table_id, region_number); + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + prepare_wal_options( + &mut region_options, + RegionId::new(table_value.table_id, region_number), + &table_value.region_info.region_wal_options, + ); + group_region_by_topic( + region_id, + &table_value.region_info.region_wal_options, + &mut topic_regions, + ); + + follower_regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + region_options, + )); + } + } + + let topic_region_values = if !topic_regions.is_empty() { + let keys = topic_regions + .iter() + .flat_map(|(topic, regions)| { + regions + .iter() + .map(|region_id| TopicRegionKey::new(*region_id, topic)) + }) + .collect::>(); + let topic_region_manager = topic_region_manager + .batch_get(keys) + .await + .context(GetMetadataSnafu)?; + Some(topic_region_manager) + } else { + None + }; + + let mut leader_region_requests = Vec::with_capacity(regions.len()); + for (region_id, engine, store_path, options) in regions { + let table_dir = table_dir(&store_path, region_id.table_id()); + let checkpoint = get_replay_checkpoint(region_id, &topic_region_values); + info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint); + leader_region_requests.push(( + region_id, + RegionOpenRequest { + engine, + table_dir, + path_type: PathType::Bare, + options, + skip_wal_replay: false, + checkpoint, + }, + )); + } + + #[cfg(feature = "enterprise")] + let follower_region_requests = { + let mut follower_region_requests = Vec::with_capacity(follower_regions.len()); + for (region_id, engine, store_path, options) in follower_regions { + let table_dir = table_dir(&store_path, region_id.table_id()); + follower_region_requests.push(( + region_id, + RegionOpenRequest { + engine, + table_dir, + path_type: PathType::Bare, + options, + skip_wal_replay: true, + checkpoint: None, + }, + )); + } + follower_region_requests + }; + + Ok(RegionOpenRequests { + leader_regions: leader_region_requests, + #[cfg(feature = "enterprise")] + follower_regions: follower_region_requests, + }) +} diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 6c495ee031..50c7e69114 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -178,6 +178,7 @@ mod tests { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }; let region = FileRegion::open(region_id, request, &object_store) @@ -230,6 +231,7 @@ mod tests { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }; let err = FileRegion::open(region_id, request, &object_store) .await diff --git a/src/log-store/src/kafka/consumer.rs b/src/log-store/src/kafka/consumer.rs index c018d69997..d8052e609f 100644 --- a/src/log-store/src/kafka/consumer.rs +++ b/src/log-store/src/kafka/consumer.rs @@ -28,6 +28,17 @@ use rskafka::record::RecordAndOffset; use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator}; +pub struct FetchResult { + /// The offsets of the fetched records. + pub records: Vec, + + /// The high watermark of the partition. + pub high_watermark: i64, + + /// The size of the response encoded in bytes. + pub encoded_response_size: usize, +} + #[async_trait::async_trait] pub trait FetchClient: std::fmt::Debug + Send + Sync { /// Fetch records. @@ -38,7 +49,9 @@ pub trait FetchClient: std::fmt::Debug + Send + Sync { offset: i64, bytes: Range, max_wait_ms: i32, - ) -> rskafka::client::error::Result<(Vec, i64)>; + ) -> rskafka::client::error::Result; + + fn topic(&self) -> &str; } #[async_trait::async_trait] @@ -48,15 +61,25 @@ impl FetchClient for PartitionClient { offset: i64, bytes: Range, max_wait_ms: i32, - ) -> rskafka::client::error::Result<(Vec, i64)> { - self.fetch_records(offset, bytes, max_wait_ms).await + ) -> rskafka::client::error::Result { + self.fetch_records(offset, bytes, max_wait_ms) + .await + .map(|r| FetchResult { + records: r.records, + high_watermark: r.high_watermark, + encoded_response_size: r.encoded_response_size, + }) + } + + fn topic(&self) -> &str { + self.topic() } } -struct FetchResult { +struct FetchResultInner { records_and_offsets: Vec, batch_size: usize, - fetch_bytes: i32, + fetch_bytes: usize, watermark: i64, used_offset: i64, } @@ -97,7 +120,23 @@ pub struct Consumer { /// The fetch future. #[builder(default = "Fuse::terminated()")] - fetch_fut: Fuse>>, + fetch_fut: Fuse>>, + + /// Total fetched bytes. + #[builder(default = "0")] + total_fetched_bytes: u64, +} + +impl Consumer { + /// Returns the total fetched bytes. + pub fn total_fetched_bytes(&self) -> u64 { + self.total_fetched_bytes + } + + /// Returns the topic name. + pub fn topic(&self) -> &str { + self.client.topic() + } } pub(crate) struct RecordsBuffer { @@ -184,15 +223,20 @@ impl Stream for Consumer { let fetch_range = 1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32); *this.fetch_fut = FutureExt::fuse(Box::pin(async move { - let (records_and_offsets, watermark) = client + let FetchResult { + records: records_and_offsets, + high_watermark: watermark, + encoded_response_size, + .. + } = client .fetch_records(offset, fetch_range, max_wait_ms) .await?; - Ok(FetchResult { + Ok(FetchResultInner { records_and_offsets, watermark, used_offset: offset, - fetch_bytes: bytes as i32, + fetch_bytes: encoded_response_size, batch_size: len, }) })); @@ -206,7 +250,7 @@ impl Stream for Consumer { let data = futures::ready!(this.fetch_fut.poll_unpin(cx)); match data { - Ok(FetchResult { + Ok(FetchResultInner { mut records_and_offsets, watermark, used_offset, @@ -217,9 +261,10 @@ impl Stream for Consumer { records_and_offsets.sort_unstable_by_key(|x| x.offset); *this.last_high_watermark = watermark; if !records_and_offsets.is_empty() { - *this.avg_record_size = fetch_bytes as usize / records_and_offsets.len(); + *this.avg_record_size = fetch_bytes / records_and_offsets.len(); debug!("set avg_record_size: {}", *this.avg_record_size); } + *this.total_fetched_bytes += fetch_bytes as u64; debug!( "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}", @@ -254,7 +299,7 @@ mod tests { use futures::TryStreamExt; use rskafka::record::{Record, RecordAndOffset}; - use super::FetchClient; + use super::*; use crate::kafka::consumer::{Consumer, RecordsBuffer}; use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex}; @@ -270,7 +315,7 @@ mod tests { offset: i64, bytes: Range, _max_wait_ms: i32, - ) -> rskafka::client::error::Result<(Vec, i64)> { + ) -> rskafka::client::error::Result { let record_size = self.record.approximate_size(); let num = (bytes.end.unsigned_abs() as usize / record_size).max(1); @@ -280,8 +325,18 @@ mod tests { offset: offset + idx as i64, }) .collect::>(); + let max_offset = offset + records.len() as i64; - Ok((records, max_offset)) + let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum(); + Ok(FetchResult { + records, + high_watermark: max_offset, + encoded_response_size, + }) + } + + fn topic(&self) -> &str { + "test" } } @@ -315,6 +370,7 @@ mod tests { index: Box::new(index), }, fetch_fut: Fuse::terminated(), + total_fetched_bytes: 0, }; let records = consumer.try_collect::>().await.unwrap(); @@ -347,6 +403,7 @@ mod tests { index: Box::new(index), }, fetch_fut: Fuse::terminated(), + total_fetched_bytes: 0, }; let records = consumer.try_collect::>().await.unwrap(); @@ -388,6 +445,7 @@ mod tests { index: Box::new(iter), }, fetch_fut: Fuse::terminated(), + total_fetched_bytes: 0, }; let records = consumer.try_collect::>().await.unwrap(); diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 54d7d079a2..55f7a147fd 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -14,11 +14,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; +use common_base::readable_size::ReadableSize; use common_meta::datanode::TopicStatsReporter; use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS; -use common_telemetry::{debug, warn}; +use common_telemetry::{debug, info, warn}; use common_time::util::current_time_millis; use common_wal::config::kafka::DatanodeKafkaConfig; use dashmap::DashMap; @@ -400,6 +401,7 @@ impl LogStore for KafkaLogStore { let mut entry_records: HashMap> = HashMap::new(); let provider = provider.clone(); let stream = async_stream::stream!({ + let now = Instant::now(); while let Some(consume_result) = stream_consumer.next().await { // Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset. // The `RecordAndOffset` contains the record data and its start offset. @@ -410,9 +412,6 @@ impl LogStore for KafkaLogStore { })?; let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset); - metrics::METRIC_KAFKA_READ_BYTES_TOTAL - .inc_by(kafka_record.approximate_size() as u64); - debug!( "Read a record at offset {} for topic {}, high watermark: {}", offset, provider.topic, high_watermark @@ -446,6 +445,17 @@ impl LogStore for KafkaLogStore { break; } } + + metrics::METRIC_KAFKA_READ_BYTES_TOTAL.inc_by(stream_consumer.total_fetched_bytes()); + + info!( + "Fetched {} bytes from topic: {}, start_entry_id: {}, end_offset: {}, elapsed: {:?}", + ReadableSize(stream_consumer.total_fetched_bytes()), + stream_consumer.topic(), + entry_id, + end_offset, + now.elapsed() + ); }); Ok(Box::pin(stream)) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7e490ef827..449e862120 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -422,6 +422,7 @@ impl MetasrvBuilder { mailbox.clone(), options.grpc.server_addr.clone(), remote_wal_options.flush_trigger_size, + remote_wal_options.checkpoint_trigger_size, ); region_flush_trigger.try_start()?; diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index dd893eb234..28be968a9f 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -82,5 +82,12 @@ lazy_static! { .unwrap(); /// The triggered region flush total counter. pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec = - register_int_counter_vec!("meta_triggered_region_flushes_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap(); + register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap(); + + /// The triggered region checkpoint total counter. + pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec = + register_int_counter_vec!("meta_triggered_region_checkpoint_total", "meta triggered region checkpoint total", &["topic_name"]).unwrap(); + /// The topic estimated replay size. + pub static ref METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE: IntGaugeVec = + register_int_gauge_vec!("meta_topic_estimated_replay_size", "meta topic estimated replay size", &["topic_name"]).unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index ac52bc8280..ec295f98d1 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -38,6 +38,7 @@ use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; +use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey}; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; @@ -534,6 +535,20 @@ impl Context { Ok(datanode_value.as_ref().unwrap()) } + /// Fetches the replay checkpoint for the given topic. + pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result> { + let region_id = self.region_id(); + let topic_region_key = TopicRegionKey::new(region_id, topic); + let value = self + .table_metadata_manager + .topic_region_manager() + .get(topic_region_key) + .await + .context(error::TableMetadataManagerSnafu)?; + + Ok(value.and_then(|value| value.checkpoint)) + } + /// Returns the [RegionId]. pub fn region_id(&self) -> RegionId { self.persistent_ctx.region_id diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 7228108cb2..ce0a0af7a6 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage; use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; +use common_meta::wal_options_allocator::extract_topic_from_wal_options; use common_meta::RegionIdent; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; @@ -67,6 +68,7 @@ impl OpenCandidateRegion { async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result { let pc = &ctx.persistent_ctx; let table_id = pc.region_id.table_id(); + let region_id = pc.region_id; let region_number = pc.region_id.region_number(); let candidate_id = pc.to_peer.id; let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; @@ -78,18 +80,31 @@ impl OpenCandidateRegion { engine, } = datanode_table_value.region_info.clone(); - let open_instruction = Instruction::OpenRegion(OpenRegion::new( - RegionIdent { - datanode_id: candidate_id, - table_id, - region_number, - engine, - }, - ®ion_storage_path, - region_options, - region_wal_options, - true, - )); + let checkpoint = + if let Some(topic) = extract_topic_from_wal_options(region_id, ®ion_wal_options) { + ctx.fetch_replay_checkpoint(&topic).await.ok().flatten() + } else { + None + }; + + let open_instruction = Instruction::OpenRegion( + OpenRegion::new( + RegionIdent { + datanode_id: candidate_id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + region_wal_options, + true, + ) + .with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id)) + .with_metadata_replay_entry_id( + checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id), + ), + ); Ok(open_instruction) } @@ -226,6 +241,8 @@ mod tests { region_options: Default::default(), region_wal_options: Default::default(), skip_wal_replay: true, + replay_entry_id: None, + metadata_replay_entry_id: None, }) } diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index ca68fd0610..1d4c34c7cd 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -200,7 +200,7 @@ mod tests { use common_wal::maybe_skip_kafka_integration_test; use common_wal::test_util::get_kafka_endpoints; - use rskafka::client::partition::UnknownTopicHandling; + use rskafka::client::partition::{FetchResult, UnknownTopicHandling}; use rskafka::record::Record; use super::*; @@ -289,8 +289,8 @@ mod tests { .await; if expect_success { assert!(res.is_ok()); - let (record, _high_watermark) = res.unwrap(); - assert!(!record.is_empty()); + let FetchResult { records, .. } = res.unwrap(); + assert!(!records.is_empty()); } else { let err = res.unwrap_err(); // The error is in a private module so we check it through `to_string()`. diff --git a/src/meta-srv/src/procedure/wal_prune/utils.rs b/src/meta-srv/src/procedure/wal_prune/utils.rs index d71f2867dd..045af46a4a 100644 --- a/src/meta-srv/src/procedure/wal_prune/utils.rs +++ b/src/meta-srv/src/procedure/wal_prune/utils.rs @@ -62,7 +62,9 @@ pub(crate) async fn find_pruneable_entry_id_for_topic( .topic_region_manager() .regions(topic) .await - .context(TableMetadataManagerSnafu)?; + .context(TableMetadataManagerSnafu)? + .into_keys() + .collect::>(); if region_ids.is_empty() { return Ok(None); } diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 4e05b718bc..a1135f943f 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -19,12 +19,16 @@ use std::time::{Duration, Instant}; use api::v1::meta::MailboxMessage; use common_base::readable_size::ReadableSize; use common_meta::instruction::{FlushRegions, Instruction}; +use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue}; use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; -use common_meta::region_registry::LeaderRegionRegistryRef; +use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef}; use common_meta::stats::topic::TopicStatsRegistryRef; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; +use common_wal::config::kafka::common::{ + DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, +}; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -77,6 +81,8 @@ pub struct RegionFlushTrigger { server_addr: String, /// The flush trigger size. flush_trigger_size: ReadableSize, + /// The checkpoint trigger size. + checkpoint_trigger_size: ReadableSize, /// The receiver of events. receiver: Receiver, } @@ -89,8 +95,23 @@ impl RegionFlushTrigger { topic_stats_registry: TopicStatsRegistryRef, mailbox: MailboxRef, server_addr: String, - flush_trigger_size: ReadableSize, + mut flush_trigger_size: ReadableSize, + mut checkpoint_trigger_size: ReadableSize, ) -> (Self, RegionFlushTicker) { + if flush_trigger_size.as_bytes() == 0 { + flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE; + warn!( + "flush_trigger_size is not set, using default value: {}", + flush_trigger_size + ); + } + if checkpoint_trigger_size.as_bytes() == 0 { + checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE; + warn!( + "checkpoint_trigger_size is not set, using default value: {}", + checkpoint_trigger_size + ); + } let (tx, rx) = Self::channel(); let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx); let region_flush_trigger = Self { @@ -100,6 +121,7 @@ impl RegionFlushTrigger { mailbox, server_addr, flush_trigger_size, + checkpoint_trigger_size, receiver: rx, }; (region_flush_trigger, region_flush_ticker) @@ -197,6 +219,52 @@ impl RegionFlushTrigger { Some((latest_entry_id, stat.avg_record_size)) } + async fn persist_region_checkpoints( + &self, + topic: &str, + region_ids: &[RegionId], + leader_regions: &HashMap, + ) -> Result<()> { + if region_ids.is_empty() { + return Ok(()); + } + + let regions = region_ids + .iter() + .flat_map(|region_id| match leader_regions.get(region_id) { + Some(leader_region) => { + let entry_id = leader_region.manifest.replay_entry_id(); + let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id(); + + Some(( + TopicRegionKey::new(*region_id, topic), + Some(TopicRegionValue::new(Some(ReplayCheckpoint::new( + entry_id, + metadata_entry_id, + )))), + )) + } + None => None, + }) + .collect::>(); + + let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops(); + let batch_size = max_txn_ops.min(regions.len()); + for batch in regions.chunks(batch_size) { + self.table_metadata_manager + .topic_region_manager() + .batch_put(batch) + .await + .context(error::TableMetadataManagerSnafu)?; + } + + metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL + .with_label_values(&[topic]) + .inc_by(regions.len() as u64); + + Ok(()) + } + async fn flush_regions_in_topic( &self, topic: &str, @@ -209,14 +277,34 @@ impl RegionFlushTrigger { .regions(topic) .await .context(error::TableMetadataManagerSnafu)?; + if region_ids.is_empty() { debug!("No regions found for topic: {}", topic); return Ok(()); } - let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = self + // Filters regions need to persist checkpoints. + let regions_to_persist = filter_regions_by_replay_size( + topic, + region_ids + .iter() + .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())), + avg_record_size as u64, + latest_entry_id, + self.checkpoint_trigger_size, + ); + let region_manifests = self .leader_region_registry - .batch_get(region_ids.iter().cloned()) + .batch_get(region_ids.keys().cloned()); + + if let Err(err) = self + .persist_region_checkpoints(topic, ®ions_to_persist, ®ion_manifests) + .await + { + error!(err; "Failed to persist region checkpoints for topic: {}", topic); + } + + let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests .into_iter() .partition_map(|(region_id, region)| { if !region.manifest.is_inactive() { @@ -226,8 +314,24 @@ impl RegionFlushTrigger { } }); + let min_entry_id = inactive_regions + .iter() + .min_by_key(|(_, entry_id)| *entry_id); + let min_entry_id = active_regions + .iter() + .min_by_key(|(_, entry_id)| *entry_id) + .or(min_entry_id); + + if let Some((_, min_entry_id)) = min_entry_id { + let replay_size = (latest_entry_id.saturating_sub(*min_entry_id)) + .saturating_mul(avg_record_size as u64); + metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE + .with_label_values(&[topic]) + .set(replay_size as i64); + } + // Selects regions to flush from the set of active regions. - let mut regions_to_flush = select_regions_to_flush( + let mut regions_to_flush = filter_regions_by_replay_size( topic, active_regions.into_iter(), avg_record_size as u64, @@ -239,7 +343,7 @@ impl RegionFlushTrigger { // Selects regions to flush from the set of inactive regions. // For inactive regions, we use a lower flush trigger size (half of the normal size) // to encourage more aggressive flushing to update the region's topic latest entry id. - let inactive_regions_to_flush = select_regions_to_flush( + let inactive_regions_to_flush = filter_regions_by_replay_size( topic, inactive_regions.into_iter(), avg_record_size as u64, @@ -304,26 +408,26 @@ impl RegionFlushTrigger { } } -/// Select regions to flush based on the estimated replay size. +/// Filter regions based on the estimated replay size. /// -/// The regions are selected if the estimated replay size exceeds the flush trigger size. +/// Returns the regions if its estimated replay size exceeds the given threshold. /// The estimated replay size is calculated as: /// `(latest_entry_id - prunable_entry_id) * avg_record_size` -fn select_regions_to_flush>( +fn filter_regions_by_replay_size>( topic: &str, regions: I, avg_record_size: u64, latest_entry_id: u64, - flush_trigger_size: ReadableSize, + threshold: ReadableSize, ) -> Vec { let mut regions_to_flush = Vec::new(); - for (region_id, prunable_entry_id) in regions { - if prunable_entry_id < latest_entry_id { - let replay_size = (latest_entry_id - prunable_entry_id).saturating_mul(avg_record_size); - if replay_size > flush_trigger_size.as_bytes() { + for (region_id, entry_id) in regions { + if entry_id < latest_entry_id { + let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size); + if replay_size > threshold.as_bytes() { debug!( - "Region {}: estimated replay size {} exceeds flush trigger size {}, prunable entry id: {}, topic latest entry id: {}, topic: '{}'", - region_id, ReadableSize(replay_size), flush_trigger_size, prunable_entry_id, latest_entry_id, topic + "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'", + region_id, ReadableSize(replay_size), threshold, entry_id, latest_entry_id, topic ); regions_to_flush.push(region_id); } @@ -421,7 +525,7 @@ mod tests { (region_id(1, 3), 95), // replay_size = 50 ]; - let result = select_regions_to_flush( + let result = filter_regions_by_replay_size( topic, regions.into_iter(), avg_record_size, @@ -445,7 +549,7 @@ mod tests { (region_id(1, 3), 90), // replay_size = 100 ]; - let result = select_regions_to_flush( + let result = filter_regions_by_replay_size( topic, regions.into_iter(), avg_record_size, @@ -465,7 +569,7 @@ mod tests { let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)]; // replay_size will always be 0, so none should be flushed - let result = select_regions_to_flush( + let result = filter_regions_by_replay_size( topic, regions.into_iter(), avg_record_size, @@ -487,7 +591,7 @@ mod tests { (region_id(1, 2), 99), // replay_size = 10 ]; - let result = select_regions_to_flush( + let result = filter_regions_by_replay_size( topic, regions.into_iter(), avg_record_size, @@ -512,7 +616,7 @@ mod tests { (region_id(1, 4), 200), // replay_size = 0 ]; - let result = select_regions_to_flush( + let result = filter_regions_by_replay_size( topic, regions.into_iter(), avg_record_size, diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index a69ef1d13c..19a989d0a3 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -519,6 +519,7 @@ mod test { path_type: PathType::Bare, // Use Bare path type for engine regions options: physical_region_option, skip_wal_replay: false, + checkpoint: None, }; engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) @@ -542,6 +543,7 @@ mod test { path_type: PathType::Bare, // Use Bare path type for engine regions options: HashMap::new(), skip_wal_replay: false, + checkpoint: None, }; engine .handle_request( @@ -620,6 +622,7 @@ mod test { path_type: PathType::Bare, options: physical_region_option, skip_wal_replay: false, + checkpoint: None, }; // Opening an already opened region should succeed. // Since the region is already open, no metadata recovery operations will be performed. @@ -647,6 +650,7 @@ mod test { path_type: PathType::Bare, options: physical_region_option, skip_wal_replay: false, + checkpoint: None, }; let err = metric_engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 6cb06c7dfc..e30a446cff 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -22,7 +22,7 @@ use datafusion::common::HashMap; use mito2::engine::MITO_ENGINE_NAME; use snafu::{OptionExt, ResultExt}; use store_api::region_engine::{BatchResponses, RegionEngine}; -use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint}; use store_api::storage::RegionId; use crate::engine::create::region_options_for_metadata_region; @@ -204,12 +204,18 @@ impl MetricEngineInner { request: RegionOpenRequest, ) -> (RegionOpenRequest, RegionOpenRequest) { let metadata_region_options = region_options_for_metadata_region(&request.options); + let checkpoint = request.checkpoint; + let open_metadata_region_request = RegionOpenRequest { table_dir: request.table_dir.clone(), path_type: PathType::Metadata, options: metadata_region_options, engine: MITO_ENGINE_NAME.to_string(), skip_wal_replay: request.skip_wal_replay, + checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint { + entry_id: checkpoint.metadata_entry_id.unwrap_or_default(), + metadata_entry_id: None, + }), }; let mut data_region_options = request.options; @@ -223,6 +229,10 @@ impl MetricEngineInner { options: data_region_options, engine: MITO_ENGINE_NAME.to_string(), skip_wal_replay: request.skip_wal_replay, + checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint { + entry_id: checkpoint.entry_id, + metadata_entry_id: None, + }), }; (open_metadata_region_request, open_data_region_request) @@ -238,25 +248,17 @@ impl MetricEngineInner { let data_region_id = utils::to_data_region_id(region_id); let (open_metadata_region_request, open_data_region_request) = self.transform_open_physical_region_request(request); - - self.mito - .handle_request( - metadata_region_id, - RegionRequest::Open(open_metadata_region_request), + let _ = self + .mito + .handle_batch_open_requests( + 2, + vec![ + (metadata_region_id, open_metadata_region_request), + (data_region_id, open_data_region_request), + ], ) .await - .with_context(|_| OpenMitoRegionSnafu { - region_type: "metadata", - })?; - self.mito - .handle_request( - data_region_id, - RegionRequest::Open(open_data_region_request), - ) - .await - .with_context(|_| OpenMitoRegionSnafu { - region_type: "data", - })?; + .context(BatchOpenMitoRegionSnafu {})?; info!("Opened physical metric region {region_id}"); PHYSICAL_REGION_COUNT.inc(); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index be039e26ea..11d2191cac 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -115,6 +115,7 @@ impl TestEnv { path_type: PathType::Bare, // Use Bare path type for engine regions options: physical_region_option, skip_wal_replay: true, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index e6ae024f6e..7ff4c94d82 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -207,6 +207,7 @@ async fn test_alter_region() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -293,6 +294,7 @@ async fn test_put_after_alter() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -611,6 +613,7 @@ async fn test_alter_column_fulltext_options() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -722,6 +725,7 @@ async fn test_alter_column_set_inverted_index() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 4f52a4c2dd..349fd10b17 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -158,6 +158,7 @@ async fn test_region_replay(factory: Option) { path_type: store_api::region_request::PathType::Bare, options, skip_wal_replay: false, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs index cef7f46fc6..9fae7164a0 100644 --- a/src/mito2/src/engine/batch_open_test.rs +++ b/src/mito2/src/engine/batch_open_test.rs @@ -125,6 +125,7 @@ async fn test_batch_open(factory: Option) { options: options.clone(), skip_wal_replay: false, path_type: PathType::Bare, + checkpoint: None, }, ) }) @@ -137,6 +138,7 @@ async fn test_batch_open(factory: Option) { options: options.clone(), skip_wal_replay: false, path_type: PathType::Bare, + checkpoint: None, }, )); @@ -190,6 +192,7 @@ async fn test_batch_open_err(factory: Option) { options: options.clone(), skip_wal_replay: false, path_type: PathType::Bare, + checkpoint: None, }, ) }) diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index ad3c7f4b15..d346fecebd 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -95,6 +95,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { path_type: store_api::region_request::PathType::Bare, options, skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -216,6 +217,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { path_type: store_api::region_request::PathType::Bare, options, skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -425,6 +428,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { path_type: store_api::region_request::PathType::Bare, options, skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -532,6 +536,7 @@ async fn open_region( options: HashMap::new(), skip_wal_replay, path_type: PathType::Bare, + checkpoint: None, }), ) .await @@ -626,6 +631,7 @@ async fn test_local_catchup(factory: Option) { options: HashMap::new(), skip_wal_replay: true, path_type: PathType::Bare, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9e1aa7d3de..e7a23a23c4 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -792,6 +792,7 @@ async fn test_change_region_compaction_window() { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -875,6 +876,7 @@ async fn test_open_overwrite_compaction_window() { path_type: PathType::Bare, options, skip_wal_replay: false, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 3f9e834f6a..f10404f429 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -50,6 +50,7 @@ async fn test_engine_open_empty() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -85,6 +86,7 @@ async fn test_engine_open_existing() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -180,6 +182,7 @@ async fn test_engine_region_open_with_options() { path_type: PathType::Bare, options: HashMap::from([("ttl".to_string(), "4d".to_string())]), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -226,6 +229,7 @@ async fn test_engine_region_open_with_custom_store() { path_type: PathType::Bare, options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -298,6 +302,7 @@ async fn test_open_region_skip_wal_replay() { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: true, + checkpoint: None, }), ) .await @@ -328,6 +333,7 @@ async fn test_open_region_skip_wal_replay() { path_type: PathType::Bare, options: Default::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -370,6 +376,7 @@ async fn test_open_region_wait_for_opening_region_ok() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -410,6 +417,7 @@ async fn test_open_region_wait_for_opening_region_err() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index b5e8718d44..f423b4b41a 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -51,6 +51,7 @@ async fn scan_in_parallel( options: HashMap::default(), skip_wal_replay: false, path_type: PathType::Bare, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index c6a98e63d0..a4d02dbb02 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -111,6 +111,7 @@ async fn test_sync_after_flush_region() { options: Default::default(), // Ensure the region is not replayed from the WAL. skip_wal_replay: true, + checkpoint: None, }), ) .await @@ -207,6 +208,7 @@ async fn test_sync_after_alter_region() { options: Default::default(), // Ensure the region is not replayed from the WAL. skip_wal_replay: true, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 4da3978ba6..e724626af7 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -276,6 +276,7 @@ async fn test_engine_truncate_reopen() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await @@ -384,6 +385,7 @@ async fn test_engine_truncate_during_flush() { path_type: PathType::Bare, options: HashMap::default(), skip_wal_replay: false, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 9b4c790c06..48a303921a 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -85,6 +85,7 @@ pub(crate) struct RegionOpener { time_provider: TimeProviderRef, stats: ManifestStats, wal_entry_reader: Option>, + replay_checkpoint: Option, } impl RegionOpener { @@ -118,6 +119,7 @@ impl RegionOpener { time_provider, stats: Default::default(), wal_entry_reader: None, + replay_checkpoint: None, } } @@ -149,6 +151,12 @@ impl RegionOpener { self.options(RegionOptions::try_from(&options)?) } + /// Sets the replay checkpoint for the region. + pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option) -> Self { + self.replay_checkpoint = replay_checkpoint; + self + } + /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of /// constructing a new one from scratch. pub(crate) fn wal_entry_reader( @@ -432,17 +440,22 @@ impl RegionOpener { let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { + let replay_from_entry_id = self + .replay_checkpoint + .unwrap_or_default() + .max(flushed_entry_id); info!( - "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}", - flushed_entry_id + 1, + "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}", + replay_from_entry_id, region_id, - manifest.manifest_version + manifest.manifest_version, + flushed_entry_id ); replay_memtable( &provider, wal_entry_reader, region_id, - flushed_entry_id, + replay_from_entry_id, &version_control, config.allow_stale_entries, on_region_opened, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 65340eb76a..fe70f5cc36 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1100,6 +1100,7 @@ pub async fn reopen_region( options, skip_wal_replay: false, path_type: PathType::Bare, + checkpoint: None, }), ) .await diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 86963f6598..23148c36b1 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -107,6 +107,7 @@ impl RegionWorkerLoop { .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _)) + .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id)) .parse_options(request.options) { Ok(opener) => opener, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index fad0680048..64fd74b6d8 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -291,6 +291,7 @@ fn make_region_open(open: OpenRequest) -> Result> path_type: PathType::Bare, options: open.options, skip_wal_replay: false, + checkpoint: None, }), )]) } @@ -503,6 +504,14 @@ pub struct RegionOpenRequest { pub options: HashMap, /// To skip replaying the WAL. pub skip_wal_replay: bool, + /// Replay checkpoint. + pub checkpoint: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ReplayCheckpoint { + pub entry_id: u64, + pub metadata_entry_id: Option, } impl RegionOpenRequest {