diff --git a/Cargo.lock b/Cargo.lock
index dc1efc1804..c86a7f77a2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10777,7 +10777,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
-source = "git+https://github.com/influxdata/rskafka.git?rev=a62120b6c74d68953464b256f858dc1c41a903b4#a62120b6c74d68953464b256f858dc1c41a903b4"
+source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76"
dependencies = [
"bytes",
"chrono",
diff --git a/Cargo.toml b/Cargo.toml
index f8016a83cb..a2e8d368e4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -191,7 +191,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
-rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "a62120b6c74d68953464b256f858dc1c41a903b4", features = [
+rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [
"transport-tls",
] }
rstest = "0.25"
diff --git a/config/config.md b/config/config.md
index fcd23a2b5f..7b3f4745b3 100644
--- a/config/config.md
+++ b/config/config.md
@@ -379,8 +379,9 @@
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. **It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL. Set to `true` to automatically create topics for WAL. Otherwise, use topics named `topic_name_prefix_[0..num_topics)` **It's only used when the provider is `kafka`**. |
-| `wal.auto_prune_interval` | String | `10m` | Interval of automatically WAL pruning. Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. **It's only used when the provider is `kafka`**. |
+| `wal.auto_prune_interval` | String | `30m` | Interval of automatically WAL pruning. Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. **It's only used when the provider is `kafka`**. |
| `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL. Since multiple regions may share a Kafka topic, the estimated size is calculated as: (latest_entry_id - flushed_entry_id) * avg_record_size MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`. - `latest_entry_id`: The latest entry ID in the topic. - `flushed_entry_id`: The last flushed entry ID for the region. Set to "0" to let the system decide the flush trigger size. **It's only used when the provider is `kafka`**. |
+| `wal.checkpoint_trigger_size` | String | `128MB` | Estimated size threshold to trigger a checkpoint when using Kafka remote WAL. The estimated size is calculated as: (latest_entry_id - last_checkpoint_entry_id) * avg_record_size MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`. Set to "0" to disable checkpoint trigger. **It's only used when the provider is `kafka`**. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. **It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL. **It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type. Available selector types: - `round_robin` (default) **It's only used when the provider is `kafka`**. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index fc9b8a8ef4..a7f53eea49 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -190,7 +190,7 @@ auto_create_topics = true
## Interval of automatically WAL pruning.
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
## **It's only used when the provider is `kafka`**.
-auto_prune_interval = "10m"
+auto_prune_interval = "30m"
## Estimated size threshold to trigger a flush when using Kafka remote WAL.
@@ -203,6 +203,14 @@ auto_prune_interval = "10m"
## **It's only used when the provider is `kafka`**.
flush_trigger_size = "512MB"
+## Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.
+## The estimated size is calculated as:
+## (latest_entry_id - last_checkpoint_entry_id) * avg_record_size
+## MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.
+## Set to "0" to disable checkpoint trigger.
+## **It's only used when the provider is `kafka`**.
+checkpoint_trigger_size = "128MB"
+
## Concurrent task limit for automatically WAL pruning.
## **It's only used when the provider is `kafka`**.
auto_prune_parallelism = 10
diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs
index 1d2259310c..b74b8c25b4 100644
--- a/src/common/meta/src/instruction.rs
+++ b/src/common/meta/src/instruction.rs
@@ -108,6 +108,10 @@ pub struct OpenRegion {
pub region_wal_options: HashMap,
#[serde(default)]
pub skip_wal_replay: bool,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub replay_entry_id: Option,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub metadata_replay_entry_id: Option,
}
impl OpenRegion {
@@ -124,8 +128,22 @@ impl OpenRegion {
region_options,
region_wal_options,
skip_wal_replay,
+ replay_entry_id: None,
+ metadata_replay_entry_id: None,
}
}
+
+ /// Sets the replay entry id.
+ pub fn with_replay_entry_id(mut self, replay_entry_id: Option) -> Self {
+ self.replay_entry_id = replay_entry_id;
+ self
+ }
+
+ /// Sets the metadata replay entry id.
+ pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option) -> Self {
+ self.metadata_replay_entry_id = metadata_replay_entry_id;
+ self
+ }
}
/// The instruction of downgrading leader region.
@@ -352,6 +370,8 @@ mod tests {
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
+ replay_entry_id: None,
+ metadata_replay_entry_id: None,
};
assert_eq!(expected, deserialized);
}
diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs
index 7cf1b0bef8..3688091843 100644
--- a/src/common/meta/src/key.rs
+++ b/src/common/meta/src/key.rs
@@ -155,6 +155,7 @@ use crate::error::{self, Result, SerdeJsonSnafu};
use crate::key::flow::flow_state::FlowStateValue;
use crate::key::node_address::NodeAddressValue;
use crate::key::table_route::TableRouteKey;
+use crate::key::topic_region::TopicRegionValue;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -164,6 +165,7 @@ use crate::state_store::PoisonValue;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
+pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
@@ -271,6 +273,10 @@ lazy_static! {
pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
}
+lazy_static! {
+ pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
+}
+
lazy_static! {
static ref TABLE_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
@@ -326,7 +332,7 @@ lazy_static! {
lazy_static! {
pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
- "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
+ "^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
))
.unwrap();
}
@@ -1434,7 +1440,8 @@ impl_metadata_value! {
NodeAddressValue,
SchemaNameValue,
FlowStateValue,
- PoisonValue
+ PoisonValue,
+ TopicRegionValue
}
impl_optional_metadata_value! {
@@ -1676,9 +1683,11 @@ mod tests {
.topic_region_manager
.regions(&topic)
.await
- .unwrap();
+ .unwrap()
+ .into_keys()
+ .collect::>();
assert_eq!(regions.len(), 8);
- assert_eq!(regions[0], region_id);
+ assert!(regions.contains(®ion_id));
}
}
diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs
index 51c90d69ea..776d0fceca 100644
--- a/src/common/meta/src/key/topic_region.rs
+++ b/src/common/meta/src/key/topic_region.rs
@@ -12,20 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Copyright 2023 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
use std::collections::HashMap;
use std::fmt::{self, Display};
@@ -37,10 +23,12 @@ use table::metadata::TableId;
use crate::ddl::utils::parse_region_wal_options;
use crate::error::{Error, InvalidMetadataSnafu, Result};
-use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
+use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
-use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
+use crate::rpc::store::{
+ BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
+};
use crate::rpc::KeyValue;
// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
@@ -51,8 +39,20 @@ pub struct TopicRegionKey<'a> {
pub topic: &'a str,
}
-#[derive(Debug, Serialize, Deserialize)]
-pub struct TopicRegionValue;
+/// Represents additional information for a region when using a shared WAL.
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
+pub struct TopicRegionValue {
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub checkpoint: Option,
+}
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
+pub struct ReplayCheckpoint {
+ #[serde(default)]
+ pub entry_id: u64,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub metadata_entry_id: Option,
+}
impl<'a> TopicRegionKey<'a> {
pub fn new(region_id: RegionId, topic: &'a str) -> Self {
@@ -118,9 +118,47 @@ impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
}
}
-fn topic_region_decoder(value: &KeyValue) -> Result> {
+impl ReplayCheckpoint {
+ /// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
+ pub fn new(entry_id: u64, metadata_entry_id: Option) -> Self {
+ Self {
+ entry_id,
+ metadata_entry_id,
+ }
+ }
+}
+
+impl TopicRegionValue {
+ /// Creates a new [`TopicRegionValue`] with the given checkpoint.
+ pub fn new(checkpoint: Option) -> Self {
+ Self { checkpoint }
+ }
+
+ /// Returns the minimum entry id of the region.
+ ///
+ /// If the metadata entry id is not set, it returns the entry id.
+ pub fn min_entry_id(&self) -> Option {
+ match self.checkpoint {
+ Some(ReplayCheckpoint {
+ entry_id,
+ metadata_entry_id,
+ }) => match metadata_entry_id {
+ Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
+ None => Some(entry_id),
+ },
+ None => None,
+ }
+ }
+}
+
+fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
let key = TopicRegionKey::from_bytes(&value.key)?;
- Ok(key)
+ let value = if value.value.is_empty() {
+ TopicRegionValue::default()
+ } else {
+ TopicRegionValue::try_from_raw_value(&value.value)?
+ };
+ Ok((key, value))
}
/// Manages map of topics and regions in kvbackend.
@@ -143,21 +181,59 @@ impl TopicRegionManager {
Ok(())
}
- pub async fn batch_put(&self, keys: Vec>) -> Result<()> {
+ pub async fn batch_get(
+ &self,
+ keys: Vec>,
+ ) -> Result> {
+ let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::>();
+ let req = BatchGetRequest { keys: raw_keys };
+ let resp = self.kv_backend.batch_get(req).await?;
+
+ let v = resp
+ .kvs
+ .into_iter()
+ .map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
+ .collect::>>()?;
+
+ Ok(v)
+ }
+
+ pub async fn get(&self, key: TopicRegionKey<'_>) -> Result> {
+ let key_bytes = key.to_bytes();
+ let resp = self.kv_backend.get(&key_bytes).await?;
+ let value = resp
+ .map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
+ .transpose()?;
+
+ Ok(value)
+ }
+
+ pub async fn batch_put(
+ &self,
+ keys: &[(TopicRegionKey<'_>, Option)],
+ ) -> Result<()> {
let req = BatchPutRequest {
kvs: keys
- .into_iter()
- .map(|key| KeyValue {
- key: key.to_bytes(),
- value: vec![],
+ .iter()
+ .map(|(key, value)| {
+ let value = value
+ .map(|v| v.try_as_raw_value())
+ .transpose()?
+ .unwrap_or_default();
+
+ Ok(KeyValue {
+ key: key.to_bytes(),
+ value,
+ })
})
- .collect(),
+ .collect::>>()?,
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
+ /// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
pub fn build_create_txn(
&self,
table_id: TableId,
@@ -176,8 +252,8 @@ impl TopicRegionManager {
Ok(Txn::new().and_then(operations))
}
- /// Returns the list of region ids using specified topic.
- pub async fn regions(&self, topic: &str) -> Result> {
+ /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
+ pub async fn regions(&self, topic: &str) -> Result> {
let prefix = TopicRegionKey::range_topic_key(topic);
let req = RangeRequest::new().with_prefix(prefix.as_bytes());
let resp = self.kv_backend.range(req).await?;
@@ -186,7 +262,10 @@ impl TopicRegionManager {
.iter()
.map(topic_region_decoder)
.collect::>>()?;
- Ok(region_ids.iter().map(|key| key.region_id).collect())
+ Ok(region_ids
+ .into_iter()
+ .map(|(key, value)| (key.region_id, value))
+ .collect())
}
pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
@@ -248,15 +327,24 @@ mod tests {
let topics = (0..16).map(|i| format!("topic_{}", i)).collect::>();
let keys = (0..64)
- .map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
+ .map(|i| {
+ (
+ TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
+ None,
+ )
+ })
.collect::>();
- manager.batch_put(keys.clone()).await.unwrap();
-
- let mut key_values = manager.regions(&topics[0]).await.unwrap();
+ manager.batch_put(&keys).await.unwrap();
+ let mut key_values = manager
+ .regions(&topics[0])
+ .await
+ .unwrap()
+ .into_keys()
+ .collect::>();
let expected = keys
.iter()
- .filter_map(|key| {
+ .filter_map(|(key, _)| {
if key.topic == topics[0] {
Some(key.region_id)
} else {
@@ -269,10 +357,15 @@ mod tests {
let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
manager.delete(key.clone()).await.unwrap();
- let mut key_values = manager.regions(&topics[0]).await.unwrap();
+ let mut key_values = manager
+ .regions(&topics[0])
+ .await
+ .unwrap()
+ .into_keys()
+ .collect::>();
let expected = keys
.iter()
- .filter_map(|key| {
+ .filter_map(|(key, _)| {
if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
Some(key.region_id)
} else {
@@ -324,4 +417,18 @@ mod tests {
expected.sort_by_key(|(region_id, _)| region_id.as_u64());
assert_eq!(topic_region_map, expected);
}
+
+ #[test]
+ fn test_topic_region_key_is_match() {
+ let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
+ let topic_region_key = TopicRegionKey::try_from(key).unwrap();
+ assert_eq!(
+ topic_region_key.topic,
+ "6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
+ );
+ assert_eq!(
+ topic_region_key.region_id,
+ RegionId::from_u64(4410931412992)
+ );
+ }
}
diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs
index 83834937dc..b579c583bf 100644
--- a/src/common/meta/src/region_registry.rs
+++ b/src/common/meta/src/region_registry.rs
@@ -133,6 +133,34 @@ impl LeaderRegionManifestInfo {
}
}
+ /// Returns the replay entry id of the data region.
+ pub fn replay_entry_id(&self) -> u64 {
+ match self {
+ LeaderRegionManifestInfo::Mito {
+ flushed_entry_id,
+ topic_latest_entry_id,
+ ..
+ } => (*flushed_entry_id).max(*topic_latest_entry_id),
+ LeaderRegionManifestInfo::Metric {
+ data_flushed_entry_id,
+ data_topic_latest_entry_id,
+ ..
+ } => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
+ }
+ }
+
+ /// Returns the replay entry id of the metadata region.
+ pub fn metadata_replay_entry_id(&self) -> Option {
+ match self {
+ LeaderRegionManifestInfo::Metric {
+ metadata_flushed_entry_id,
+ metadata_topic_latest_entry_id,
+ ..
+ } => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
+ _ => None,
+ }
+ }
+
/// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
///
/// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs
index 9be4655dd7..fb631363f1 100644
--- a/src/common/meta/src/wal_options_allocator.rs
+++ b/src/common/meta/src/wal_options_allocator.rs
@@ -27,7 +27,7 @@ use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
-use crate::key::NAME_PATTERN_REGEX;
+use crate::key::TOPIC_NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::{
@@ -109,7 +109,7 @@ pub async fn build_wal_options_allocator(
MetasrvWalConfig::Kafka(kafka_config) => {
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
ensure!(
- NAME_PATTERN_REGEX.is_match(prefix),
+ TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
InvalidTopicNamePrefixSnafu { prefix }
);
let topic_creator =
@@ -149,6 +149,26 @@ pub fn prepare_wal_options(
}
}
+/// Extracts the topic from the wal options.
+pub fn extract_topic_from_wal_options(
+ region_id: RegionId,
+ region_options: &HashMap,
+) -> Option {
+ region_options
+ .get(®ion_id.region_number())
+ .and_then(|wal_options| {
+ serde_json::from_str::(wal_options)
+ .ok()
+ .and_then(|wal_options| {
+ if let WalOptions::Kafka(kafka_wal_option) = wal_options {
+ Some(kafka_wal_option.topic)
+ } else {
+ None
+ }
+ })
+ })
+}
+
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index 4bfcbc46e7..04b173060d 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -20,7 +20,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
- DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
+ DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
+ DEFAULT_FLUSH_TRIGGER_SIZE,
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
@@ -64,6 +65,8 @@ impl From for MetasrvWalConfig {
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
// This field won't be used in standalone mode
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
+ // This field won't be used in standalone mode
+ checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
}),
}
}
@@ -205,9 +208,10 @@ mod tests {
create_topic_timeout: Duration::from_secs(30),
},
auto_create_topics: true,
- auto_prune_interval: Duration::from_secs(0),
+ auto_prune_interval: Duration::from_mins(30),
auto_prune_parallelism: 10,
flush_trigger_size: ReadableSize::mb(512),
+ checkpoint_trigger_size: ReadableSize::mb(128),
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs
index 56145b06c9..1028ca838b 100644
--- a/src/common/wal/src/config/kafka/common.rs
+++ b/src/common/wal/src/config/kafka/common.rs
@@ -37,11 +37,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
};
/// Default interval for auto WAL pruning.
-pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
+pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
/// Default limit for concurrent auto pruning tasks.
pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
/// Default size of WAL to trigger flush.
pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
+/// Default checkpoint trigger size.
+pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs
index 4ed23e5a3b..bffde60c81 100644
--- a/src/common/wal/src/config/kafka/metasrv.rs
+++ b/src/common/wal/src/config/kafka/metasrv.rs
@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
- DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
+ DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
};
/// Kafka wal configurations for metasrv.
@@ -41,6 +41,8 @@ pub struct MetasrvKafkaConfig {
pub auto_prune_parallelism: usize,
// The size of WAL to trigger flush.
pub flush_trigger_size: ReadableSize,
+ // The checkpoint trigger size.
+ pub checkpoint_trigger_size: ReadableSize,
}
impl Default for MetasrvKafkaConfig {
@@ -52,6 +54,7 @@ impl Default for MetasrvKafkaConfig {
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
+ checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
}
}
}
diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs
index 659a045f57..49b16ea598 100644
--- a/src/common/wal/src/lib.rs
+++ b/src/common/wal/src/lib.rs
@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
+#![feature(duration_constructors_lite)]
use std::net::SocketAddr;
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 181b18d937..bcd5effd86 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -23,18 +23,15 @@ use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::datanode::TopicStatsReporter;
-use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
-use common_meta::wal_options_allocator::prepare_wal_options;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::{error, info, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
-use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{default_index_file, GlobalIndexCollector};
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -49,10 +46,8 @@ use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
use snafu::{ensure, OptionExt, ResultExt};
-use store_api::path_utils::{table_dir, WAL_DIR};
+use store_api::path_utils::WAL_DIR;
use store_api::region_engine::{RegionEngineRef, RegionRole};
-use store_api::region_request::{PathType, RegionOpenRequest};
-use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;
@@ -70,6 +65,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store::{self, new_object_store_without_cache};
+use crate::utils::{build_region_open_requests, RegionOpenRequests};
/// Datanode service.
pub struct Datanode {
@@ -252,16 +248,12 @@ impl DatanodeBuilder {
.recovery_mode()
.await
.context(GetMetadataSnafu)?;
- let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
- let table_values = datanode_table_manager
- .tables(node_id)
- .try_collect::>()
- .await
- .context(GetMetadataSnafu)?;
+ let region_open_requests =
+ build_region_open_requests(node_id, self.kv_backend.clone()).await?;
let open_all_regions = open_all_regions(
region_server.clone(),
- table_values,
+ region_open_requests,
!controlled_by_metasrv,
self.opts.init_regions_parallelism,
// Ignore nonexistent regions in recovery mode.
@@ -342,27 +334,22 @@ impl DatanodeBuilder {
async fn initialize_region_server(
&self,
region_server: &RegionServer,
- kv_backend: KvBackendRef,
open_with_writable: bool,
) -> Result<()> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
- let runtime_switch_manager = RuntimeSwitchManager::new(kv_backend.clone());
+ // TODO(weny): Considering introducing a readonly kv_backend trait.
+ let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
let is_recovery_mode = runtime_switch_manager
.recovery_mode()
.await
.context(GetMetadataSnafu)?;
-
- let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
- let table_values = datanode_table_manager
- .tables(node_id)
- .try_collect::>()
- .await
- .context(GetMetadataSnafu)?;
+ let region_open_requests =
+ build_region_open_requests(node_id, self.kv_backend.clone()).await?;
open_all_regions(
region_server.clone(),
- table_values,
+ region_open_requests,
open_with_writable,
self.opts.init_regions_parallelism,
is_recovery_mode,
@@ -609,73 +596,24 @@ impl DatanodeBuilder {
/// Open all regions belong to this datanode.
async fn open_all_regions(
region_server: RegionServer,
- table_values: Vec,
+ region_open_requests: RegionOpenRequests,
open_with_writable: bool,
init_regions_parallelism: usize,
ignore_nonexistent_region: bool,
) -> Result<()> {
- let mut regions = vec![];
- #[cfg(feature = "enterprise")]
- let mut follower_regions = vec![];
- for table_value in table_values {
- for region_number in table_value.regions {
- // Augments region options with wal options if a wal options is provided.
- let mut region_options = table_value.region_info.region_options.clone();
- prepare_wal_options(
- &mut region_options,
- RegionId::new(table_value.table_id, region_number),
- &table_value.region_info.region_wal_options,
- );
-
- regions.push((
- RegionId::new(table_value.table_id, region_number),
- table_value.region_info.engine.clone(),
- table_value.region_info.region_storage_path.clone(),
- region_options,
- ));
- }
-
+ let RegionOpenRequests {
+ leader_regions,
#[cfg(feature = "enterprise")]
- for region_number in table_value.follower_regions {
- // Augments region options with wal options if a wal options is provided.
- let mut region_options = table_value.region_info.region_options.clone();
- prepare_wal_options(
- &mut region_options,
- RegionId::new(table_value.table_id, region_number),
- &table_value.region_info.region_wal_options,
- );
-
- follower_regions.push((
- RegionId::new(table_value.table_id, region_number),
- table_value.region_info.engine.clone(),
- table_value.region_info.region_storage_path.clone(),
- region_options,
- ));
- }
- }
- let num_regions = regions.len();
- info!("going to open {} region(s)", num_regions);
-
- let mut region_requests = Vec::with_capacity(regions.len());
- for (region_id, engine, store_path, options) in regions {
- let table_dir = table_dir(&store_path, region_id.table_id());
- region_requests.push((
- region_id,
- RegionOpenRequest {
- engine,
- table_dir,
- path_type: PathType::Bare,
- options,
- skip_wal_replay: false,
- },
- ));
- }
+ follower_regions,
+ } = region_open_requests;
+ let leader_region_num = leader_regions.len();
+ info!("going to open {} region(s)", leader_region_num);
let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
- region_requests,
+ leader_regions,
ignore_nonexistent_region,
)
.await?;
@@ -686,19 +624,19 @@ async fn open_all_regions(
);
if !ignore_nonexistent_region {
ensure!(
- open_regions.len() == num_regions,
+ open_regions.len() == leader_region_num,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
- num_regions,
+ leader_region_num,
open_regions.len()
)
}
);
- } else if open_regions.len() != num_regions {
+ } else if open_regions.len() != leader_region_num {
warn!(
"ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
- num_regions,
+ leader_region_num,
open_regions.len()
);
}
@@ -717,31 +655,14 @@ async fn open_all_regions(
if !follower_regions.is_empty() {
use tokio::time::Instant;
- info!(
- "going to open {} follower region(s)",
- follower_regions.len()
- );
- let mut region_requests = Vec::with_capacity(follower_regions.len());
- let num_regions = follower_regions.len();
- for (region_id, engine, store_path, options) in follower_regions {
- let table_dir = table_dir(&store_path, region_id.table_id());
- region_requests.push((
- region_id,
- RegionOpenRequest {
- engine,
- table_dir,
- path_type: PathType::Bare,
- options,
- skip_wal_replay: true,
- },
- ));
- }
+ let follower_region_num = follower_regions.len();
+ info!("going to open {} follower region(s)", follower_region_num);
let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
- region_requests,
+ follower_regions,
ignore_nonexistent_region,
)
.await?;
@@ -753,19 +674,19 @@ async fn open_all_regions(
if !ignore_nonexistent_region {
ensure!(
- open_regions.len() == num_regions,
+ open_regions.len() == follower_region_num,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
- num_regions,
+ follower_region_num,
open_regions.len()
)
}
);
- } else if open_regions.len() != num_regions {
+ } else if open_regions.len() != follower_region_num {
warn!(
"ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
- num_regions,
+ follower_region_num,
open_regions.len()
);
}
@@ -835,15 +756,13 @@ mod tests {
..Default::default()
},
Plugins::default(),
- kv_backend,
+ kv_backend.clone(),
);
builder.with_cache_registry(layered_cache_registry);
-
- let kv = Arc::new(MemoryKvBackend::default()) as _;
- setup_table_datanode(&kv).await;
+ setup_table_datanode(&(kv_backend as _)).await;
builder
- .initialize_region_server(&mock_region_server, kv.clone(), false)
+ .initialize_region_server(&mock_region_server, false)
.await
.unwrap();
diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs
index 4a6952daf8..871bc5b9c3 100644
--- a/src/datanode/src/heartbeat/handler/open_region.rs
+++ b/src/datanode/src/heartbeat/handler/open_region.rs
@@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options;
use futures_util::future::BoxFuture;
use store_api::path_utils::table_dir;
-use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
+use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint};
use crate::heartbeat::handler::HandlerContext;
@@ -29,17 +29,31 @@ impl HandlerContext {
mut region_options,
region_wal_options,
skip_wal_replay,
+ replay_entry_id,
+ metadata_replay_entry_id,
}: OpenRegion,
) -> BoxFuture<'static, Option> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(®ion_ident);
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
+ let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
+ (Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint {
+ entry_id: replay_entry_id,
+ metadata_entry_id: Some(metadata_replay_entry_id),
+ }),
+ (Some(replay_entry_id), None) => Some(ReplayCheckpoint {
+ entry_id: replay_entry_id,
+ metadata_entry_id: None,
+ }),
+ _ => None,
+ };
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
path_type: PathType::Bare,
options: region_options,
skip_wal_replay,
+ checkpoint,
});
let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok();
diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs
index cdb67d836f..bc63d71c8a 100644
--- a/src/datanode/src/lib.rs
+++ b/src/datanode/src/lib.rs
@@ -28,3 +28,4 @@ pub mod service;
pub mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;
+pub mod utils;
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index 882d1b2e92..0859fae5ca 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -1410,6 +1410,7 @@ mod tests {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -1579,6 +1580,7 @@ mod tests {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
},
),
(
@@ -1589,6 +1591,7 @@ mod tests {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
},
),
],
@@ -1610,6 +1613,7 @@ mod tests {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
},
),
(
@@ -1620,6 +1624,7 @@ mod tests {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
},
),
],
diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs
new file mode 100644
index 0000000000..77b9246c0f
--- /dev/null
+++ b/src/datanode/src/utils.rs
@@ -0,0 +1,188 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+
+use common_meta::key::datanode_table::DatanodeTableManager;
+use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
+use common_meta::kv_backend::KvBackendRef;
+use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
+use common_meta::DatanodeId;
+use futures::TryStreamExt;
+use snafu::ResultExt;
+use store_api::path_utils::table_dir;
+use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
+use store_api::storage::{RegionId, RegionNumber};
+use tracing::info;
+
+use crate::error::{GetMetadataSnafu, Result};
+
+/// The requests to open regions.
+pub(crate) struct RegionOpenRequests {
+ pub leader_regions: Vec<(RegionId, RegionOpenRequest)>,
+ #[cfg(feature = "enterprise")]
+ pub follower_regions: Vec<(RegionId, RegionOpenRequest)>,
+}
+
+fn group_region_by_topic(
+ region_id: RegionId,
+ region_options: &HashMap,
+ topic_regions: &mut HashMap>,
+) {
+ if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
+ topic_regions.entry(topic).or_default().push(region_id);
+ }
+}
+
+fn get_replay_checkpoint(
+ region_id: RegionId,
+ topic_region_values: &Option>,
+) -> Option {
+ let topic_region_values = topic_region_values.as_ref()?;
+ let topic_region_value = topic_region_values.get(®ion_id);
+ let replay_checkpoint = topic_region_value.and_then(|value| value.checkpoint);
+ replay_checkpoint.map(|checkpoint| ReplayCheckpoint {
+ entry_id: checkpoint.entry_id,
+ metadata_entry_id: checkpoint.metadata_entry_id,
+ })
+}
+
+pub(crate) async fn build_region_open_requests(
+ node_id: DatanodeId,
+ kv_backend: KvBackendRef,
+) -> Result {
+ let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
+ let table_values = datanode_table_manager
+ .tables(node_id)
+ .try_collect::>()
+ .await
+ .context(GetMetadataSnafu)?;
+
+ let topic_region_manager = TopicRegionManager::new(kv_backend);
+ let mut topic_regions = HashMap::>::new();
+ let mut regions = vec![];
+ #[cfg(feature = "enterprise")]
+ let mut follower_regions = vec![];
+
+ for table_value in table_values {
+ for region_number in table_value.regions {
+ let region_id = RegionId::new(table_value.table_id, region_number);
+ // Augments region options with wal options if a wal options is provided.
+ let mut region_options = table_value.region_info.region_options.clone();
+ prepare_wal_options(
+ &mut region_options,
+ region_id,
+ &table_value.region_info.region_wal_options,
+ );
+ group_region_by_topic(
+ region_id,
+ &table_value.region_info.region_wal_options,
+ &mut topic_regions,
+ );
+
+ regions.push((
+ region_id,
+ table_value.region_info.engine.clone(),
+ table_value.region_info.region_storage_path.clone(),
+ region_options,
+ ));
+ }
+
+ #[cfg(feature = "enterprise")]
+ for region_number in table_value.follower_regions {
+ let region_id = RegionId::new(table_value.table_id, region_number);
+ // Augments region options with wal options if a wal options is provided.
+ let mut region_options = table_value.region_info.region_options.clone();
+ prepare_wal_options(
+ &mut region_options,
+ RegionId::new(table_value.table_id, region_number),
+ &table_value.region_info.region_wal_options,
+ );
+ group_region_by_topic(
+ region_id,
+ &table_value.region_info.region_wal_options,
+ &mut topic_regions,
+ );
+
+ follower_regions.push((
+ RegionId::new(table_value.table_id, region_number),
+ table_value.region_info.engine.clone(),
+ table_value.region_info.region_storage_path.clone(),
+ region_options,
+ ));
+ }
+ }
+
+ let topic_region_values = if !topic_regions.is_empty() {
+ let keys = topic_regions
+ .iter()
+ .flat_map(|(topic, regions)| {
+ regions
+ .iter()
+ .map(|region_id| TopicRegionKey::new(*region_id, topic))
+ })
+ .collect::>();
+ let topic_region_manager = topic_region_manager
+ .batch_get(keys)
+ .await
+ .context(GetMetadataSnafu)?;
+ Some(topic_region_manager)
+ } else {
+ None
+ };
+
+ let mut leader_region_requests = Vec::with_capacity(regions.len());
+ for (region_id, engine, store_path, options) in regions {
+ let table_dir = table_dir(&store_path, region_id.table_id());
+ let checkpoint = get_replay_checkpoint(region_id, &topic_region_values);
+ info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
+ leader_region_requests.push((
+ region_id,
+ RegionOpenRequest {
+ engine,
+ table_dir,
+ path_type: PathType::Bare,
+ options,
+ skip_wal_replay: false,
+ checkpoint,
+ },
+ ));
+ }
+
+ #[cfg(feature = "enterprise")]
+ let follower_region_requests = {
+ let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
+ for (region_id, engine, store_path, options) in follower_regions {
+ let table_dir = table_dir(&store_path, region_id.table_id());
+ follower_region_requests.push((
+ region_id,
+ RegionOpenRequest {
+ engine,
+ table_dir,
+ path_type: PathType::Bare,
+ options,
+ skip_wal_replay: true,
+ checkpoint: None,
+ },
+ ));
+ }
+ follower_region_requests
+ };
+
+ Ok(RegionOpenRequests {
+ leader_regions: leader_region_requests,
+ #[cfg(feature = "enterprise")]
+ follower_regions: follower_region_requests,
+ })
+}
diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs
index 6c495ee031..50c7e69114 100644
--- a/src/file-engine/src/region.rs
+++ b/src/file-engine/src/region.rs
@@ -178,6 +178,7 @@ mod tests {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
};
let region = FileRegion::open(region_id, request, &object_store)
@@ -230,6 +231,7 @@ mod tests {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
};
let err = FileRegion::open(region_id, request, &object_store)
.await
diff --git a/src/log-store/src/kafka/consumer.rs b/src/log-store/src/kafka/consumer.rs
index c018d69997..d8052e609f 100644
--- a/src/log-store/src/kafka/consumer.rs
+++ b/src/log-store/src/kafka/consumer.rs
@@ -28,6 +28,17 @@ use rskafka::record::RecordAndOffset;
use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
+pub struct FetchResult {
+ /// The offsets of the fetched records.
+ pub records: Vec,
+
+ /// The high watermark of the partition.
+ pub high_watermark: i64,
+
+ /// The size of the response encoded in bytes.
+ pub encoded_response_size: usize,
+}
+
#[async_trait::async_trait]
pub trait FetchClient: std::fmt::Debug + Send + Sync {
/// Fetch records.
@@ -38,7 +49,9 @@ pub trait FetchClient: std::fmt::Debug + Send + Sync {
offset: i64,
bytes: Range,
max_wait_ms: i32,
- ) -> rskafka::client::error::Result<(Vec, i64)>;
+ ) -> rskafka::client::error::Result;
+
+ fn topic(&self) -> &str;
}
#[async_trait::async_trait]
@@ -48,15 +61,25 @@ impl FetchClient for PartitionClient {
offset: i64,
bytes: Range,
max_wait_ms: i32,
- ) -> rskafka::client::error::Result<(Vec, i64)> {
- self.fetch_records(offset, bytes, max_wait_ms).await
+ ) -> rskafka::client::error::Result {
+ self.fetch_records(offset, bytes, max_wait_ms)
+ .await
+ .map(|r| FetchResult {
+ records: r.records,
+ high_watermark: r.high_watermark,
+ encoded_response_size: r.encoded_response_size,
+ })
+ }
+
+ fn topic(&self) -> &str {
+ self.topic()
}
}
-struct FetchResult {
+struct FetchResultInner {
records_and_offsets: Vec,
batch_size: usize,
- fetch_bytes: i32,
+ fetch_bytes: usize,
watermark: i64,
used_offset: i64,
}
@@ -97,7 +120,23 @@ pub struct Consumer {
/// The fetch future.
#[builder(default = "Fuse::terminated()")]
- fetch_fut: Fuse>>,
+ fetch_fut: Fuse>>,
+
+ /// Total fetched bytes.
+ #[builder(default = "0")]
+ total_fetched_bytes: u64,
+}
+
+impl Consumer {
+ /// Returns the total fetched bytes.
+ pub fn total_fetched_bytes(&self) -> u64 {
+ self.total_fetched_bytes
+ }
+
+ /// Returns the topic name.
+ pub fn topic(&self) -> &str {
+ self.client.topic()
+ }
}
pub(crate) struct RecordsBuffer {
@@ -184,15 +223,20 @@ impl Stream for Consumer {
let fetch_range =
1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
- let (records_and_offsets, watermark) = client
+ let FetchResult {
+ records: records_and_offsets,
+ high_watermark: watermark,
+ encoded_response_size,
+ ..
+ } = client
.fetch_records(offset, fetch_range, max_wait_ms)
.await?;
- Ok(FetchResult {
+ Ok(FetchResultInner {
records_and_offsets,
watermark,
used_offset: offset,
- fetch_bytes: bytes as i32,
+ fetch_bytes: encoded_response_size,
batch_size: len,
})
}));
@@ -206,7 +250,7 @@ impl Stream for Consumer {
let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
match data {
- Ok(FetchResult {
+ Ok(FetchResultInner {
mut records_and_offsets,
watermark,
used_offset,
@@ -217,9 +261,10 @@ impl Stream for Consumer {
records_and_offsets.sort_unstable_by_key(|x| x.offset);
*this.last_high_watermark = watermark;
if !records_and_offsets.is_empty() {
- *this.avg_record_size = fetch_bytes as usize / records_and_offsets.len();
+ *this.avg_record_size = fetch_bytes / records_and_offsets.len();
debug!("set avg_record_size: {}", *this.avg_record_size);
}
+ *this.total_fetched_bytes += fetch_bytes as u64;
debug!(
"Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
@@ -254,7 +299,7 @@ mod tests {
use futures::TryStreamExt;
use rskafka::record::{Record, RecordAndOffset};
- use super::FetchClient;
+ use super::*;
use crate::kafka::consumer::{Consumer, RecordsBuffer};
use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
@@ -270,7 +315,7 @@ mod tests {
offset: i64,
bytes: Range,
_max_wait_ms: i32,
- ) -> rskafka::client::error::Result<(Vec, i64)> {
+ ) -> rskafka::client::error::Result {
let record_size = self.record.approximate_size();
let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
@@ -280,8 +325,18 @@ mod tests {
offset: offset + idx as i64,
})
.collect::>();
+
let max_offset = offset + records.len() as i64;
- Ok((records, max_offset))
+ let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
+ Ok(FetchResult {
+ records,
+ high_watermark: max_offset,
+ encoded_response_size,
+ })
+ }
+
+ fn topic(&self) -> &str {
+ "test"
}
}
@@ -315,6 +370,7 @@ mod tests {
index: Box::new(index),
},
fetch_fut: Fuse::terminated(),
+ total_fetched_bytes: 0,
};
let records = consumer.try_collect::>().await.unwrap();
@@ -347,6 +403,7 @@ mod tests {
index: Box::new(index),
},
fetch_fut: Fuse::terminated(),
+ total_fetched_bytes: 0,
};
let records = consumer.try_collect::>().await.unwrap();
@@ -388,6 +445,7 @@ mod tests {
index: Box::new(iter),
},
fetch_fut: Fuse::terminated(),
+ total_fetched_bytes: 0,
};
let records = consumer.try_collect::>().await.unwrap();
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index 54d7d079a2..55f7a147fd 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -14,11 +14,12 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
+use common_base::readable_size::ReadableSize;
use common_meta::datanode::TopicStatsReporter;
use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
-use common_telemetry::{debug, warn};
+use common_telemetry::{debug, info, warn};
use common_time::util::current_time_millis;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
@@ -400,6 +401,7 @@ impl LogStore for KafkaLogStore {
let mut entry_records: HashMap> = HashMap::new();
let provider = provider.clone();
let stream = async_stream::stream!({
+ let now = Instant::now();
while let Some(consume_result) = stream_consumer.next().await {
// Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset.
// The `RecordAndOffset` contains the record data and its start offset.
@@ -410,9 +412,6 @@ impl LogStore for KafkaLogStore {
})?;
let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
- metrics::METRIC_KAFKA_READ_BYTES_TOTAL
- .inc_by(kafka_record.approximate_size() as u64);
-
debug!(
"Read a record at offset {} for topic {}, high watermark: {}",
offset, provider.topic, high_watermark
@@ -446,6 +445,17 @@ impl LogStore for KafkaLogStore {
break;
}
}
+
+ metrics::METRIC_KAFKA_READ_BYTES_TOTAL.inc_by(stream_consumer.total_fetched_bytes());
+
+ info!(
+ "Fetched {} bytes from topic: {}, start_entry_id: {}, end_offset: {}, elapsed: {:?}",
+ ReadableSize(stream_consumer.total_fetched_bytes()),
+ stream_consumer.topic(),
+ entry_id,
+ end_offset,
+ now.elapsed()
+ );
});
Ok(Box::pin(stream))
}
diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs
index 7e490ef827..449e862120 100644
--- a/src/meta-srv/src/metasrv/builder.rs
+++ b/src/meta-srv/src/metasrv/builder.rs
@@ -422,6 +422,7 @@ impl MetasrvBuilder {
mailbox.clone(),
options.grpc.server_addr.clone(),
remote_wal_options.flush_trigger_size,
+ remote_wal_options.checkpoint_trigger_size,
);
region_flush_trigger.try_start()?;
diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs
index dd893eb234..28be968a9f 100644
--- a/src/meta-srv/src/metrics.rs
+++ b/src/meta-srv/src/metrics.rs
@@ -82,5 +82,12 @@ lazy_static! {
.unwrap();
/// The triggered region flush total counter.
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
- register_int_counter_vec!("meta_triggered_region_flushes_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
+ register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
+
+ /// The triggered region checkpoint total counter.
+ pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =
+ register_int_counter_vec!("meta_triggered_region_checkpoint_total", "meta triggered region checkpoint total", &["topic_name"]).unwrap();
+ /// The topic estimated replay size.
+ pub static ref METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE: IntGaugeVec =
+ register_int_gauge_vec!("meta_topic_estimated_replay_size", "meta topic estimated replay size", &["topic_name"]).unwrap();
}
diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs
index ac52bc8280..ec295f98d1 100644
--- a/src/meta-srv/src/procedure/region_migration.rs
+++ b/src/meta-srv/src/procedure/region_migration.rs
@@ -38,6 +38,7 @@ use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
+use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
@@ -534,6 +535,20 @@ impl Context {
Ok(datanode_value.as_ref().unwrap())
}
+ /// Fetches the replay checkpoint for the given topic.
+ pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result> {
+ let region_id = self.region_id();
+ let topic_region_key = TopicRegionKey::new(region_id, topic);
+ let value = self
+ .table_metadata_manager
+ .topic_region_manager()
+ .get(topic_region_key)
+ .await
+ .context(error::TableMetadataManagerSnafu)?;
+
+ Ok(value.and_then(|value| value.checkpoint))
+ }
+
/// Returns the [RegionId].
pub fn region_id(&self) -> RegionId {
self.persistent_ctx.region_id
diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
index 7228108cb2..ce0a0af7a6 100644
--- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs
@@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
+use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_meta::RegionIdent;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
@@ -67,6 +68,7 @@ impl OpenCandidateRegion {
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result {
let pc = &ctx.persistent_ctx;
let table_id = pc.region_id.table_id();
+ let region_id = pc.region_id;
let region_number = pc.region_id.region_number();
let candidate_id = pc.to_peer.id;
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
@@ -78,18 +80,31 @@ impl OpenCandidateRegion {
engine,
} = datanode_table_value.region_info.clone();
- let open_instruction = Instruction::OpenRegion(OpenRegion::new(
- RegionIdent {
- datanode_id: candidate_id,
- table_id,
- region_number,
- engine,
- },
- ®ion_storage_path,
- region_options,
- region_wal_options,
- true,
- ));
+ let checkpoint =
+ if let Some(topic) = extract_topic_from_wal_options(region_id, ®ion_wal_options) {
+ ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
+ } else {
+ None
+ };
+
+ let open_instruction = Instruction::OpenRegion(
+ OpenRegion::new(
+ RegionIdent {
+ datanode_id: candidate_id,
+ table_id,
+ region_number,
+ engine,
+ },
+ ®ion_storage_path,
+ region_options,
+ region_wal_options,
+ true,
+ )
+ .with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id))
+ .with_metadata_replay_entry_id(
+ checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id),
+ ),
+ );
Ok(open_instruction)
}
@@ -226,6 +241,8 @@ mod tests {
region_options: Default::default(),
region_wal_options: Default::default(),
skip_wal_replay: true,
+ replay_entry_id: None,
+ metadata_replay_entry_id: None,
})
}
diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs
index ca68fd0610..1d4c34c7cd 100644
--- a/src/meta-srv/src/procedure/wal_prune.rs
+++ b/src/meta-srv/src/procedure/wal_prune.rs
@@ -200,7 +200,7 @@ mod tests {
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
- use rskafka::client::partition::UnknownTopicHandling;
+ use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
use rskafka::record::Record;
use super::*;
@@ -289,8 +289,8 @@ mod tests {
.await;
if expect_success {
assert!(res.is_ok());
- let (record, _high_watermark) = res.unwrap();
- assert!(!record.is_empty());
+ let FetchResult { records, .. } = res.unwrap();
+ assert!(!records.is_empty());
} else {
let err = res.unwrap_err();
// The error is in a private module so we check it through `to_string()`.
diff --git a/src/meta-srv/src/procedure/wal_prune/utils.rs b/src/meta-srv/src/procedure/wal_prune/utils.rs
index d71f2867dd..045af46a4a 100644
--- a/src/meta-srv/src/procedure/wal_prune/utils.rs
+++ b/src/meta-srv/src/procedure/wal_prune/utils.rs
@@ -62,7 +62,9 @@ pub(crate) async fn find_pruneable_entry_id_for_topic(
.topic_region_manager()
.regions(topic)
.await
- .context(TableMetadataManagerSnafu)?;
+ .context(TableMetadataManagerSnafu)?
+ .into_keys()
+ .collect::>();
if region_ids.is_empty() {
return Ok(None);
}
diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs
index 4e05b718bc..a1135f943f 100644
--- a/src/meta-srv/src/region/flush_trigger.rs
+++ b/src/meta-srv/src/region/flush_trigger.rs
@@ -19,12 +19,16 @@ use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_base::readable_size::ReadableSize;
use common_meta::instruction::{FlushRegions, Instruction};
+use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
-use common_meta::region_registry::LeaderRegionRegistryRef;
+use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
use common_meta::stats::topic::TopicStatsRegistryRef;
-use common_telemetry::{debug, error, info};
+use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
+use common_wal::config::kafka::common::{
+ DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
+};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -77,6 +81,8 @@ pub struct RegionFlushTrigger {
server_addr: String,
/// The flush trigger size.
flush_trigger_size: ReadableSize,
+ /// The checkpoint trigger size.
+ checkpoint_trigger_size: ReadableSize,
/// The receiver of events.
receiver: Receiver,
}
@@ -89,8 +95,23 @@ impl RegionFlushTrigger {
topic_stats_registry: TopicStatsRegistryRef,
mailbox: MailboxRef,
server_addr: String,
- flush_trigger_size: ReadableSize,
+ mut flush_trigger_size: ReadableSize,
+ mut checkpoint_trigger_size: ReadableSize,
) -> (Self, RegionFlushTicker) {
+ if flush_trigger_size.as_bytes() == 0 {
+ flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
+ warn!(
+ "flush_trigger_size is not set, using default value: {}",
+ flush_trigger_size
+ );
+ }
+ if checkpoint_trigger_size.as_bytes() == 0 {
+ checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
+ warn!(
+ "checkpoint_trigger_size is not set, using default value: {}",
+ checkpoint_trigger_size
+ );
+ }
let (tx, rx) = Self::channel();
let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
let region_flush_trigger = Self {
@@ -100,6 +121,7 @@ impl RegionFlushTrigger {
mailbox,
server_addr,
flush_trigger_size,
+ checkpoint_trigger_size,
receiver: rx,
};
(region_flush_trigger, region_flush_ticker)
@@ -197,6 +219,52 @@ impl RegionFlushTrigger {
Some((latest_entry_id, stat.avg_record_size))
}
+ async fn persist_region_checkpoints(
+ &self,
+ topic: &str,
+ region_ids: &[RegionId],
+ leader_regions: &HashMap,
+ ) -> Result<()> {
+ if region_ids.is_empty() {
+ return Ok(());
+ }
+
+ let regions = region_ids
+ .iter()
+ .flat_map(|region_id| match leader_regions.get(region_id) {
+ Some(leader_region) => {
+ let entry_id = leader_region.manifest.replay_entry_id();
+ let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id();
+
+ Some((
+ TopicRegionKey::new(*region_id, topic),
+ Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(
+ entry_id,
+ metadata_entry_id,
+ )))),
+ ))
+ }
+ None => None,
+ })
+ .collect::>();
+
+ let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
+ let batch_size = max_txn_ops.min(regions.len());
+ for batch in regions.chunks(batch_size) {
+ self.table_metadata_manager
+ .topic_region_manager()
+ .batch_put(batch)
+ .await
+ .context(error::TableMetadataManagerSnafu)?;
+ }
+
+ metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
+ .with_label_values(&[topic])
+ .inc_by(regions.len() as u64);
+
+ Ok(())
+ }
+
async fn flush_regions_in_topic(
&self,
topic: &str,
@@ -209,14 +277,34 @@ impl RegionFlushTrigger {
.regions(topic)
.await
.context(error::TableMetadataManagerSnafu)?;
+
if region_ids.is_empty() {
debug!("No regions found for topic: {}", topic);
return Ok(());
}
- let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = self
+ // Filters regions need to persist checkpoints.
+ let regions_to_persist = filter_regions_by_replay_size(
+ topic,
+ region_ids
+ .iter()
+ .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
+ avg_record_size as u64,
+ latest_entry_id,
+ self.checkpoint_trigger_size,
+ );
+ let region_manifests = self
.leader_region_registry
- .batch_get(region_ids.iter().cloned())
+ .batch_get(region_ids.keys().cloned());
+
+ if let Err(err) = self
+ .persist_region_checkpoints(topic, ®ions_to_persist, ®ion_manifests)
+ .await
+ {
+ error!(err; "Failed to persist region checkpoints for topic: {}", topic);
+ }
+
+ let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests
.into_iter()
.partition_map(|(region_id, region)| {
if !region.manifest.is_inactive() {
@@ -226,8 +314,24 @@ impl RegionFlushTrigger {
}
});
+ let min_entry_id = inactive_regions
+ .iter()
+ .min_by_key(|(_, entry_id)| *entry_id);
+ let min_entry_id = active_regions
+ .iter()
+ .min_by_key(|(_, entry_id)| *entry_id)
+ .or(min_entry_id);
+
+ if let Some((_, min_entry_id)) = min_entry_id {
+ let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
+ .saturating_mul(avg_record_size as u64);
+ metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
+ .with_label_values(&[topic])
+ .set(replay_size as i64);
+ }
+
// Selects regions to flush from the set of active regions.
- let mut regions_to_flush = select_regions_to_flush(
+ let mut regions_to_flush = filter_regions_by_replay_size(
topic,
active_regions.into_iter(),
avg_record_size as u64,
@@ -239,7 +343,7 @@ impl RegionFlushTrigger {
// Selects regions to flush from the set of inactive regions.
// For inactive regions, we use a lower flush trigger size (half of the normal size)
// to encourage more aggressive flushing to update the region's topic latest entry id.
- let inactive_regions_to_flush = select_regions_to_flush(
+ let inactive_regions_to_flush = filter_regions_by_replay_size(
topic,
inactive_regions.into_iter(),
avg_record_size as u64,
@@ -304,26 +408,26 @@ impl RegionFlushTrigger {
}
}
-/// Select regions to flush based on the estimated replay size.
+/// Filter regions based on the estimated replay size.
///
-/// The regions are selected if the estimated replay size exceeds the flush trigger size.
+/// Returns the regions if its estimated replay size exceeds the given threshold.
/// The estimated replay size is calculated as:
/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
-fn select_regions_to_flush>(
+fn filter_regions_by_replay_size>(
topic: &str,
regions: I,
avg_record_size: u64,
latest_entry_id: u64,
- flush_trigger_size: ReadableSize,
+ threshold: ReadableSize,
) -> Vec {
let mut regions_to_flush = Vec::new();
- for (region_id, prunable_entry_id) in regions {
- if prunable_entry_id < latest_entry_id {
- let replay_size = (latest_entry_id - prunable_entry_id).saturating_mul(avg_record_size);
- if replay_size > flush_trigger_size.as_bytes() {
+ for (region_id, entry_id) in regions {
+ if entry_id < latest_entry_id {
+ let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
+ if replay_size > threshold.as_bytes() {
debug!(
- "Region {}: estimated replay size {} exceeds flush trigger size {}, prunable entry id: {}, topic latest entry id: {}, topic: '{}'",
- region_id, ReadableSize(replay_size), flush_trigger_size, prunable_entry_id, latest_entry_id, topic
+ "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
+ region_id, ReadableSize(replay_size), threshold, entry_id, latest_entry_id, topic
);
regions_to_flush.push(region_id);
}
@@ -421,7 +525,7 @@ mod tests {
(region_id(1, 3), 95), // replay_size = 50
];
- let result = select_regions_to_flush(
+ let result = filter_regions_by_replay_size(
topic,
regions.into_iter(),
avg_record_size,
@@ -445,7 +549,7 @@ mod tests {
(region_id(1, 3), 90), // replay_size = 100
];
- let result = select_regions_to_flush(
+ let result = filter_regions_by_replay_size(
topic,
regions.into_iter(),
avg_record_size,
@@ -465,7 +569,7 @@ mod tests {
let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
// replay_size will always be 0, so none should be flushed
- let result = select_regions_to_flush(
+ let result = filter_regions_by_replay_size(
topic,
regions.into_iter(),
avg_record_size,
@@ -487,7 +591,7 @@ mod tests {
(region_id(1, 2), 99), // replay_size = 10
];
- let result = select_regions_to_flush(
+ let result = filter_regions_by_replay_size(
topic,
regions.into_iter(),
avg_record_size,
@@ -512,7 +616,7 @@ mod tests {
(region_id(1, 4), 200), // replay_size = 0
];
- let result = select_regions_to_flush(
+ let result = filter_regions_by_replay_size(
topic,
regions.into_iter(),
avg_record_size,
diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs
index a69ef1d13c..19a989d0a3 100644
--- a/src/metric-engine/src/engine.rs
+++ b/src/metric-engine/src/engine.rs
@@ -519,6 +519,7 @@ mod test {
path_type: PathType::Bare, // Use Bare path type for engine regions
options: physical_region_option,
skip_wal_replay: false,
+ checkpoint: None,
};
engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
@@ -542,6 +543,7 @@ mod test {
path_type: PathType::Bare, // Use Bare path type for engine regions
options: HashMap::new(),
skip_wal_replay: false,
+ checkpoint: None,
};
engine
.handle_request(
@@ -620,6 +622,7 @@ mod test {
path_type: PathType::Bare,
options: physical_region_option,
skip_wal_replay: false,
+ checkpoint: None,
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
@@ -647,6 +650,7 @@ mod test {
path_type: PathType::Bare,
options: physical_region_option,
skip_wal_replay: false,
+ checkpoint: None,
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs
index 6cb06c7dfc..e30a446cff 100644
--- a/src/metric-engine/src/engine/open.rs
+++ b/src/metric-engine/src/engine/open.rs
@@ -22,7 +22,7 @@ use datafusion::common::HashMap;
use mito2::engine::MITO_ENGINE_NAME;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::{BatchResponses, RegionEngine};
-use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, RegionRequest};
+use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
use store_api::storage::RegionId;
use crate::engine::create::region_options_for_metadata_region;
@@ -204,12 +204,18 @@ impl MetricEngineInner {
request: RegionOpenRequest,
) -> (RegionOpenRequest, RegionOpenRequest) {
let metadata_region_options = region_options_for_metadata_region(&request.options);
+ let checkpoint = request.checkpoint;
+
let open_metadata_region_request = RegionOpenRequest {
table_dir: request.table_dir.clone(),
path_type: PathType::Metadata,
options: metadata_region_options,
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
+ checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
+ entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
+ metadata_entry_id: None,
+ }),
};
let mut data_region_options = request.options;
@@ -223,6 +229,10 @@ impl MetricEngineInner {
options: data_region_options,
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
+ checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
+ entry_id: checkpoint.entry_id,
+ metadata_entry_id: None,
+ }),
};
(open_metadata_region_request, open_data_region_request)
@@ -238,25 +248,17 @@ impl MetricEngineInner {
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
-
- self.mito
- .handle_request(
- metadata_region_id,
- RegionRequest::Open(open_metadata_region_request),
+ let _ = self
+ .mito
+ .handle_batch_open_requests(
+ 2,
+ vec![
+ (metadata_region_id, open_metadata_region_request),
+ (data_region_id, open_data_region_request),
+ ],
)
.await
- .with_context(|_| OpenMitoRegionSnafu {
- region_type: "metadata",
- })?;
- self.mito
- .handle_request(
- data_region_id,
- RegionRequest::Open(open_data_region_request),
- )
- .await
- .with_context(|_| OpenMitoRegionSnafu {
- region_type: "data",
- })?;
+ .context(BatchOpenMitoRegionSnafu {})?;
info!("Opened physical metric region {region_id}");
PHYSICAL_REGION_COUNT.inc();
diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs
index be039e26ea..11d2191cac 100644
--- a/src/metric-engine/src/test_util.rs
+++ b/src/metric-engine/src/test_util.rs
@@ -115,6 +115,7 @@ impl TestEnv {
path_type: PathType::Bare, // Use Bare path type for engine regions
options: physical_region_option,
skip_wal_replay: true,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs
index e6ae024f6e..7ff4c94d82 100644
--- a/src/mito2/src/engine/alter_test.rs
+++ b/src/mito2/src/engine/alter_test.rs
@@ -207,6 +207,7 @@ async fn test_alter_region() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -293,6 +294,7 @@ async fn test_put_after_alter() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -611,6 +613,7 @@ async fn test_alter_column_fulltext_options() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -722,6 +725,7 @@ async fn test_alter_column_set_inverted_index() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index 4f52a4c2dd..349fd10b17 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -158,6 +158,7 @@ async fn test_region_replay(factory: Option) {
path_type: store_api::region_request::PathType::Bare,
options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs
index cef7f46fc6..9fae7164a0 100644
--- a/src/mito2/src/engine/batch_open_test.rs
+++ b/src/mito2/src/engine/batch_open_test.rs
@@ -125,6 +125,7 @@ async fn test_batch_open(factory: Option) {
options: options.clone(),
skip_wal_replay: false,
path_type: PathType::Bare,
+ checkpoint: None,
},
)
})
@@ -137,6 +138,7 @@ async fn test_batch_open(factory: Option) {
options: options.clone(),
skip_wal_replay: false,
path_type: PathType::Bare,
+ checkpoint: None,
},
));
@@ -190,6 +192,7 @@ async fn test_batch_open_err(factory: Option) {
options: options.clone(),
skip_wal_replay: false,
path_type: PathType::Bare,
+ checkpoint: None,
},
)
})
diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs
index ad3c7f4b15..d346fecebd 100644
--- a/src/mito2/src/engine/catchup_test.rs
+++ b/src/mito2/src/engine/catchup_test.rs
@@ -95,6 +95,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) {
path_type: store_api::region_request::PathType::Bare,
options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -216,6 +217,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) {
path_type: store_api::region_request::PathType::Bare,
options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -425,6 +428,7 @@ async fn test_catchup_with_manifest_update(factory: Option) {
path_type: store_api::region_request::PathType::Bare,
options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -532,6 +536,7 @@ async fn open_region(
options: HashMap::new(),
skip_wal_replay,
path_type: PathType::Bare,
+ checkpoint: None,
}),
)
.await
@@ -626,6 +631,7 @@ async fn test_local_catchup(factory: Option) {
options: HashMap::new(),
skip_wal_replay: true,
path_type: PathType::Bare,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs
index 9e1aa7d3de..e7a23a23c4 100644
--- a/src/mito2/src/engine/compaction_test.rs
+++ b/src/mito2/src/engine/compaction_test.rs
@@ -792,6 +792,7 @@ async fn test_change_region_compaction_window() {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -875,6 +876,7 @@ async fn test_open_overwrite_compaction_window() {
path_type: PathType::Bare,
options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs
index 3f9e834f6a..f10404f429 100644
--- a/src/mito2/src/engine/open_test.rs
+++ b/src/mito2/src/engine/open_test.rs
@@ -50,6 +50,7 @@ async fn test_engine_open_empty() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -85,6 +86,7 @@ async fn test_engine_open_existing() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -180,6 +182,7 @@ async fn test_engine_region_open_with_options() {
path_type: PathType::Bare,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -226,6 +229,7 @@ async fn test_engine_region_open_with_custom_store() {
path_type: PathType::Bare,
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -298,6 +302,7 @@ async fn test_open_region_skip_wal_replay() {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: true,
+ checkpoint: None,
}),
)
.await
@@ -328,6 +333,7 @@ async fn test_open_region_skip_wal_replay() {
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -370,6 +376,7 @@ async fn test_open_region_wait_for_opening_region_ok() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -410,6 +417,7 @@ async fn test_open_region_wait_for_opening_region_err() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs
index b5e8718d44..f423b4b41a 100644
--- a/src/mito2/src/engine/parallel_test.rs
+++ b/src/mito2/src/engine/parallel_test.rs
@@ -51,6 +51,7 @@ async fn scan_in_parallel(
options: HashMap::default(),
skip_wal_replay: false,
path_type: PathType::Bare,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs
index c6a98e63d0..a4d02dbb02 100644
--- a/src/mito2/src/engine/sync_test.rs
+++ b/src/mito2/src/engine/sync_test.rs
@@ -111,6 +111,7 @@ async fn test_sync_after_flush_region() {
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
+ checkpoint: None,
}),
)
.await
@@ -207,6 +208,7 @@ async fn test_sync_after_alter_region() {
options: Default::default(),
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs
index 4da3978ba6..e724626af7 100644
--- a/src/mito2/src/engine/truncate_test.rs
+++ b/src/mito2/src/engine/truncate_test.rs
@@ -276,6 +276,7 @@ async fn test_engine_truncate_reopen() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
@@ -384,6 +385,7 @@ async fn test_engine_truncate_during_flush() {
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs
index 9b4c790c06..48a303921a 100644
--- a/src/mito2/src/region/opener.rs
+++ b/src/mito2/src/region/opener.rs
@@ -85,6 +85,7 @@ pub(crate) struct RegionOpener {
time_provider: TimeProviderRef,
stats: ManifestStats,
wal_entry_reader: Option>,
+ replay_checkpoint: Option,
}
impl RegionOpener {
@@ -118,6 +119,7 @@ impl RegionOpener {
time_provider,
stats: Default::default(),
wal_entry_reader: None,
+ replay_checkpoint: None,
}
}
@@ -149,6 +151,12 @@ impl RegionOpener {
self.options(RegionOptions::try_from(&options)?)
}
+ /// Sets the replay checkpoint for the region.
+ pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option) -> Self {
+ self.replay_checkpoint = replay_checkpoint;
+ self
+ }
+
/// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
/// constructing a new one from scratch.
pub(crate) fn wal_entry_reader(
@@ -432,17 +440,22 @@ impl RegionOpener {
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
if !self.skip_wal_replay {
+ let replay_from_entry_id = self
+ .replay_checkpoint
+ .unwrap_or_default()
+ .max(flushed_entry_id);
info!(
- "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
- flushed_entry_id + 1,
+ "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
+ replay_from_entry_id,
region_id,
- manifest.manifest_version
+ manifest.manifest_version,
+ flushed_entry_id
);
replay_memtable(
&provider,
wal_entry_reader,
region_id,
- flushed_entry_id,
+ replay_from_entry_id,
&version_control,
config.allow_stale_entries,
on_region_opened,
diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs
index 65340eb76a..fe70f5cc36 100644
--- a/src/mito2/src/test_util.rs
+++ b/src/mito2/src/test_util.rs
@@ -1100,6 +1100,7 @@ pub async fn reopen_region(
options,
skip_wal_replay: false,
path_type: PathType::Bare,
+ checkpoint: None,
}),
)
.await
diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs
index 86963f6598..23148c36b1 100644
--- a/src/mito2/src/worker/handle_open.rs
+++ b/src/mito2/src/worker/handle_open.rs
@@ -107,6 +107,7 @@ impl RegionWorkerLoop {
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
+ .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
.parse_options(request.options)
{
Ok(opener) => opener,
diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs
index fad0680048..64fd74b6d8 100644
--- a/src/store-api/src/region_request.rs
+++ b/src/store-api/src/region_request.rs
@@ -291,6 +291,7 @@ fn make_region_open(open: OpenRequest) -> Result>
path_type: PathType::Bare,
options: open.options,
skip_wal_replay: false,
+ checkpoint: None,
}),
)])
}
@@ -503,6 +504,14 @@ pub struct RegionOpenRequest {
pub options: HashMap,
/// To skip replaying the WAL.
pub skip_wal_replay: bool,
+ /// Replay checkpoint.
+ pub checkpoint: Option,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ReplayCheckpoint {
+ pub entry_id: u64,
+ pub metadata_entry_id: Option,
}
impl RegionOpenRequest {