mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: add replay checkpoint to reduce overhead for remote WAL (#6816)
* feat: introduce `TopicRegionValue` Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: persist region replay checkpoint Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: introduce checkpoint Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: udpate config.md Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor: minor refactor Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: send open region instructions with reply checkpoint Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: use usize Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix unit tests Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix unit tests Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: add topic name pattern Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: enable wal prune by default Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -10777,7 +10777,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rskafka"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/influxdata/rskafka.git?rev=a62120b6c74d68953464b256f858dc1c41a903b4#a62120b6c74d68953464b256f858dc1c41a903b4"
|
||||
source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
|
||||
@@ -191,7 +191,7 @@ reqwest = { version = "0.12", default-features = false, features = [
|
||||
"stream",
|
||||
"multipart",
|
||||
] }
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "a62120b6c74d68953464b256f858dc1c41a903b4", features = [
|
||||
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [
|
||||
"transport-tls",
|
||||
] }
|
||||
rstest = "0.25"
|
||||
|
||||
@@ -379,8 +379,9 @@
|
||||
| `wal.provider` | String | `raft_engine` | -- |
|
||||
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster.<br/><br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)`<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_prune_interval` | String | `10m` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_prune_interval` | String | `30m` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL.<br/>Since multiple regions may share a Kafka topic, the estimated size is calculated as:<br/> (latest_entry_id - flushed_entry_id) * avg_record_size<br/>MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.<br/>- `latest_entry_id`: The latest entry ID in the topic.<br/>- `flushed_entry_id`: The last flushed entry ID for the region.<br/>Set to "0" to let the system decide the flush trigger size.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.checkpoint_trigger_size` | String | `128MB` | Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.<br/>The estimated size is calculated as:<br/> (latest_entry_id - last_checkpoint_entry_id) * avg_record_size<br/>MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.<br/>Set to "0" to disable checkpoint trigger.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
|
||||
|
||||
@@ -190,7 +190,7 @@ auto_create_topics = true
|
||||
## Interval of automatically WAL pruning.
|
||||
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
auto_prune_interval = "10m"
|
||||
auto_prune_interval = "30m"
|
||||
|
||||
|
||||
## Estimated size threshold to trigger a flush when using Kafka remote WAL.
|
||||
@@ -203,6 +203,14 @@ auto_prune_interval = "10m"
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
flush_trigger_size = "512MB"
|
||||
|
||||
## Estimated size threshold to trigger a checkpoint when using Kafka remote WAL.
|
||||
## The estimated size is calculated as:
|
||||
## (latest_entry_id - last_checkpoint_entry_id) * avg_record_size
|
||||
## MetaSrv triggers a checkpoint for a region when this estimated size exceeds `checkpoint_trigger_size`.
|
||||
## Set to "0" to disable checkpoint trigger.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
checkpoint_trigger_size = "128MB"
|
||||
|
||||
## Concurrent task limit for automatically WAL pruning.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
auto_prune_parallelism = 10
|
||||
|
||||
@@ -108,6 +108,10 @@ pub struct OpenRegion {
|
||||
pub region_wal_options: HashMap<RegionNumber, String>,
|
||||
#[serde(default)]
|
||||
pub skip_wal_replay: bool,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub replay_entry_id: Option<u64>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub metadata_replay_entry_id: Option<u64>,
|
||||
}
|
||||
|
||||
impl OpenRegion {
|
||||
@@ -124,8 +128,22 @@ impl OpenRegion {
|
||||
region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
replay_entry_id: None,
|
||||
metadata_replay_entry_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the replay entry id.
|
||||
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
|
||||
self.replay_entry_id = replay_entry_id;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the metadata replay entry id.
|
||||
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
|
||||
self.metadata_replay_entry_id = metadata_replay_entry_id;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// The instruction of downgrading leader region.
|
||||
@@ -352,6 +370,8 @@ mod tests {
|
||||
region_options,
|
||||
region_wal_options: HashMap::new(),
|
||||
skip_wal_replay: false,
|
||||
replay_entry_id: None,
|
||||
metadata_replay_entry_id: None,
|
||||
};
|
||||
assert_eq!(expected, deserialized);
|
||||
}
|
||||
|
||||
@@ -155,6 +155,7 @@ use crate::error::{self, Result, SerdeJsonSnafu};
|
||||
use crate::key::flow::flow_state::FlowStateValue;
|
||||
use crate::key::node_address::NodeAddressValue;
|
||||
use crate::key::table_route::TableRouteKey;
|
||||
use crate::key::topic_region::TopicRegionValue;
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
@@ -164,6 +165,7 @@ use crate::state_store::PoisonValue;
|
||||
use crate::DatanodeId;
|
||||
|
||||
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
|
||||
pub const TOPIC_NAME_PATTERN: &str = r"[a-zA-Z0-9_:-][a-zA-Z0-9_:\-\.@#]*";
|
||||
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
|
||||
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
|
||||
pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
|
||||
@@ -271,6 +273,10 @@ lazy_static! {
|
||||
pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TOPIC_NAME_PATTERN_REGEX: Regex = Regex::new(TOPIC_NAME_PATTERN).unwrap();
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref TABLE_INFO_KEY_PATTERN: Regex =
|
||||
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
|
||||
@@ -326,7 +332,7 @@ lazy_static! {
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
|
||||
"^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
|
||||
"^{TOPIC_REGION_PREFIX}/({TOPIC_NAME_PATTERN})/([0-9]+)$"
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
@@ -1434,7 +1440,8 @@ impl_metadata_value! {
|
||||
NodeAddressValue,
|
||||
SchemaNameValue,
|
||||
FlowStateValue,
|
||||
PoisonValue
|
||||
PoisonValue,
|
||||
TopicRegionValue
|
||||
}
|
||||
|
||||
impl_optional_metadata_value! {
|
||||
@@ -1676,9 +1683,11 @@ mod tests {
|
||||
.topic_region_manager
|
||||
.regions(&topic)
|
||||
.await
|
||||
.unwrap();
|
||||
.unwrap()
|
||||
.into_keys()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(regions.len(), 8);
|
||||
assert_eq!(regions[0], region_id);
|
||||
assert!(regions.contains(®ion_id));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,20 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{self, Display};
|
||||
|
||||
@@ -37,10 +23,12 @@ use table::metadata::TableId;
|
||||
|
||||
use crate::ddl::utils::parse_region_wal_options;
|
||||
use crate::error::{Error, InvalidMetadataSnafu, Result};
|
||||
use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
|
||||
use crate::key::{MetadataKey, MetadataValue, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, PutRequest, RangeRequest,
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
|
||||
@@ -51,8 +39,20 @@ pub struct TopicRegionKey<'a> {
|
||||
pub topic: &'a str,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TopicRegionValue;
|
||||
/// Represents additional information for a region when using a shared WAL.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
|
||||
pub struct TopicRegionValue {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub checkpoint: Option<ReplayCheckpoint>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
|
||||
pub struct ReplayCheckpoint {
|
||||
#[serde(default)]
|
||||
pub entry_id: u64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub metadata_entry_id: Option<u64>,
|
||||
}
|
||||
|
||||
impl<'a> TopicRegionKey<'a> {
|
||||
pub fn new(region_id: RegionId, topic: &'a str) -> Self {
|
||||
@@ -118,9 +118,47 @@ impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn topic_region_decoder(value: &KeyValue) -> Result<TopicRegionKey<'_>> {
|
||||
impl ReplayCheckpoint {
|
||||
/// Creates a new [`ReplayCheckpoint`] with the given entry id and metadata entry id.
|
||||
pub fn new(entry_id: u64, metadata_entry_id: Option<u64>) -> Self {
|
||||
Self {
|
||||
entry_id,
|
||||
metadata_entry_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TopicRegionValue {
|
||||
/// Creates a new [`TopicRegionValue`] with the given checkpoint.
|
||||
pub fn new(checkpoint: Option<ReplayCheckpoint>) -> Self {
|
||||
Self { checkpoint }
|
||||
}
|
||||
|
||||
/// Returns the minimum entry id of the region.
|
||||
///
|
||||
/// If the metadata entry id is not set, it returns the entry id.
|
||||
pub fn min_entry_id(&self) -> Option<u64> {
|
||||
match self.checkpoint {
|
||||
Some(ReplayCheckpoint {
|
||||
entry_id,
|
||||
metadata_entry_id,
|
||||
}) => match metadata_entry_id {
|
||||
Some(metadata_entry_id) => Some(entry_id.min(metadata_entry_id)),
|
||||
None => Some(entry_id),
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn topic_region_decoder(value: &KeyValue) -> Result<(TopicRegionKey<'_>, TopicRegionValue)> {
|
||||
let key = TopicRegionKey::from_bytes(&value.key)?;
|
||||
Ok(key)
|
||||
let value = if value.value.is_empty() {
|
||||
TopicRegionValue::default()
|
||||
} else {
|
||||
TopicRegionValue::try_from_raw_value(&value.value)?
|
||||
};
|
||||
Ok((key, value))
|
||||
}
|
||||
|
||||
/// Manages map of topics and regions in kvbackend.
|
||||
@@ -143,21 +181,59 @@ impl TopicRegionManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn batch_put(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
|
||||
pub async fn batch_get(
|
||||
&self,
|
||||
keys: Vec<TopicRegionKey<'_>>,
|
||||
) -> Result<HashMap<RegionId, TopicRegionValue>> {
|
||||
let raw_keys = keys.iter().map(|key| key.to_bytes()).collect::<Vec<_>>();
|
||||
let req = BatchGetRequest { keys: raw_keys };
|
||||
let resp = self.kv_backend.batch_get(req).await?;
|
||||
|
||||
let v = resp
|
||||
.kvs
|
||||
.into_iter()
|
||||
.map(|kv| topic_region_decoder(&kv).map(|(key, value)| (key.region_id, value)))
|
||||
.collect::<Result<HashMap<_, _>>>()?;
|
||||
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
pub async fn get(&self, key: TopicRegionKey<'_>) -> Result<Option<TopicRegionValue>> {
|
||||
let key_bytes = key.to_bytes();
|
||||
let resp = self.kv_backend.get(&key_bytes).await?;
|
||||
let value = resp
|
||||
.map(|kv| topic_region_decoder(&kv).map(|(_, value)| value))
|
||||
.transpose()?;
|
||||
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn batch_put(
|
||||
&self,
|
||||
keys: &[(TopicRegionKey<'_>, Option<TopicRegionValue>)],
|
||||
) -> Result<()> {
|
||||
let req = BatchPutRequest {
|
||||
kvs: keys
|
||||
.into_iter()
|
||||
.map(|key| KeyValue {
|
||||
key: key.to_bytes(),
|
||||
value: vec![],
|
||||
.iter()
|
||||
.map(|(key, value)| {
|
||||
let value = value
|
||||
.map(|v| v.try_as_raw_value())
|
||||
.transpose()?
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(KeyValue {
|
||||
key: key.to_bytes(),
|
||||
value,
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
prev_kv: false,
|
||||
};
|
||||
self.kv_backend.batch_put(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build a create topic region mapping transaction. It only executes while the primary keys comparing successes.
|
||||
pub fn build_create_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
@@ -176,8 +252,8 @@ impl TopicRegionManager {
|
||||
Ok(Txn::new().and_then(operations))
|
||||
}
|
||||
|
||||
/// Returns the list of region ids using specified topic.
|
||||
pub async fn regions(&self, topic: &str) -> Result<Vec<RegionId>> {
|
||||
/// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
|
||||
pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
|
||||
let prefix = TopicRegionKey::range_topic_key(topic);
|
||||
let req = RangeRequest::new().with_prefix(prefix.as_bytes());
|
||||
let resp = self.kv_backend.range(req).await?;
|
||||
@@ -186,7 +262,10 @@ impl TopicRegionManager {
|
||||
.iter()
|
||||
.map(topic_region_decoder)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok(region_ids.iter().map(|key| key.region_id).collect())
|
||||
Ok(region_ids
|
||||
.into_iter()
|
||||
.map(|(key, value)| (key.region_id, value))
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
|
||||
@@ -248,15 +327,24 @@ mod tests {
|
||||
|
||||
let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
|
||||
let keys = (0..64)
|
||||
.map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
|
||||
.map(|i| {
|
||||
(
|
||||
TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]),
|
||||
None,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
manager.batch_put(keys.clone()).await.unwrap();
|
||||
|
||||
let mut key_values = manager.regions(&topics[0]).await.unwrap();
|
||||
manager.batch_put(&keys).await.unwrap();
|
||||
let mut key_values = manager
|
||||
.regions(&topics[0])
|
||||
.await
|
||||
.unwrap()
|
||||
.into_keys()
|
||||
.collect::<Vec<_>>();
|
||||
let expected = keys
|
||||
.iter()
|
||||
.filter_map(|key| {
|
||||
.filter_map(|(key, _)| {
|
||||
if key.topic == topics[0] {
|
||||
Some(key.region_id)
|
||||
} else {
|
||||
@@ -269,10 +357,15 @@ mod tests {
|
||||
|
||||
let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
|
||||
manager.delete(key.clone()).await.unwrap();
|
||||
let mut key_values = manager.regions(&topics[0]).await.unwrap();
|
||||
let mut key_values = manager
|
||||
.regions(&topics[0])
|
||||
.await
|
||||
.unwrap()
|
||||
.into_keys()
|
||||
.collect::<Vec<_>>();
|
||||
let expected = keys
|
||||
.iter()
|
||||
.filter_map(|key| {
|
||||
.filter_map(|(key, _)| {
|
||||
if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
|
||||
Some(key.region_id)
|
||||
} else {
|
||||
@@ -324,4 +417,18 @@ mod tests {
|
||||
expected.sort_by_key(|(region_id, _)| region_id.as_u64());
|
||||
assert_eq!(topic_region_map, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_topic_region_key_is_match() {
|
||||
let key = "__topic_region/6f153a64-7fac-4cf6-8b0b-a7967dd73879_2/4410931412992";
|
||||
let topic_region_key = TopicRegionKey::try_from(key).unwrap();
|
||||
assert_eq!(
|
||||
topic_region_key.topic,
|
||||
"6f153a64-7fac-4cf6-8b0b-a7967dd73879_2"
|
||||
);
|
||||
assert_eq!(
|
||||
topic_region_key.region_id,
|
||||
RegionId::from_u64(4410931412992)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,34 @@ impl LeaderRegionManifestInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the replay entry id of the data region.
|
||||
pub fn replay_entry_id(&self) -> u64 {
|
||||
match self {
|
||||
LeaderRegionManifestInfo::Mito {
|
||||
flushed_entry_id,
|
||||
topic_latest_entry_id,
|
||||
..
|
||||
} => (*flushed_entry_id).max(*topic_latest_entry_id),
|
||||
LeaderRegionManifestInfo::Metric {
|
||||
data_flushed_entry_id,
|
||||
data_topic_latest_entry_id,
|
||||
..
|
||||
} => (*data_flushed_entry_id).max(*data_topic_latest_entry_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the replay entry id of the metadata region.
|
||||
pub fn metadata_replay_entry_id(&self) -> Option<u64> {
|
||||
match self {
|
||||
LeaderRegionManifestInfo::Metric {
|
||||
metadata_flushed_entry_id,
|
||||
metadata_topic_latest_entry_id,
|
||||
..
|
||||
} => Some((*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
|
||||
///
|
||||
/// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
|
||||
|
||||
@@ -27,7 +27,7 @@ use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
|
||||
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
|
||||
use crate::key::NAME_PATTERN_REGEX;
|
||||
use crate::key::TOPIC_NAME_PATTERN_REGEX;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::leadership_notifier::LeadershipChangeListener;
|
||||
pub use crate::wal_options_allocator::topic_creator::{
|
||||
@@ -109,7 +109,7 @@ pub async fn build_wal_options_allocator(
|
||||
MetasrvWalConfig::Kafka(kafka_config) => {
|
||||
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
|
||||
ensure!(
|
||||
NAME_PATTERN_REGEX.is_match(prefix),
|
||||
TOPIC_NAME_PATTERN_REGEX.is_match(prefix),
|
||||
InvalidTopicNamePrefixSnafu { prefix }
|
||||
);
|
||||
let topic_creator =
|
||||
@@ -149,6 +149,26 @@ pub fn prepare_wal_options(
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the topic from the wal options.
|
||||
pub fn extract_topic_from_wal_options(
|
||||
region_id: RegionId,
|
||||
region_options: &HashMap<RegionNumber, String>,
|
||||
) -> Option<String> {
|
||||
region_options
|
||||
.get(®ion_id.region_number())
|
||||
.and_then(|wal_options| {
|
||||
serde_json::from_str::<WalOptions>(wal_options)
|
||||
.ok()
|
||||
.and_then(|wal_options| {
|
||||
if let WalOptions::Kafka(kafka_wal_option) = wal_options {
|
||||
Some(kafka_wal_option.topic)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
@@ -20,7 +20,8 @@ use std::time::Duration;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::kafka::common::{
|
||||
DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE,
|
||||
DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
};
|
||||
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
|
||||
use crate::config::raft_engine::RaftEngineConfig;
|
||||
@@ -64,6 +65,8 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
|
||||
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
|
||||
// This field won't be used in standalone mode
|
||||
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
// This field won't be used in standalone mode
|
||||
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -205,9 +208,10 @@ mod tests {
|
||||
create_topic_timeout: Duration::from_secs(30),
|
||||
},
|
||||
auto_create_topics: true,
|
||||
auto_prune_interval: Duration::from_secs(0),
|
||||
auto_prune_interval: Duration::from_mins(30),
|
||||
auto_prune_parallelism: 10,
|
||||
flush_trigger_size: ReadableSize::mb(512),
|
||||
checkpoint_trigger_size: ReadableSize::mb(128),
|
||||
};
|
||||
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
|
||||
|
||||
|
||||
@@ -37,11 +37,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
|
||||
};
|
||||
|
||||
/// Default interval for auto WAL pruning.
|
||||
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
|
||||
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
|
||||
/// Default limit for concurrent auto pruning tasks.
|
||||
pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
|
||||
/// Default size of WAL to trigger flush.
|
||||
pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
|
||||
/// Default checkpoint trigger size.
|
||||
pub const DEFAULT_CHECKPOINT_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(128);
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
|
||||
|
||||
@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::kafka::common::{
|
||||
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
|
||||
DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
};
|
||||
|
||||
/// Kafka wal configurations for metasrv.
|
||||
@@ -41,6 +41,8 @@ pub struct MetasrvKafkaConfig {
|
||||
pub auto_prune_parallelism: usize,
|
||||
// The size of WAL to trigger flush.
|
||||
pub flush_trigger_size: ReadableSize,
|
||||
// The checkpoint trigger size.
|
||||
pub checkpoint_trigger_size: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for MetasrvKafkaConfig {
|
||||
@@ -52,6 +54,7 @@ impl Default for MetasrvKafkaConfig {
|
||||
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
|
||||
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
|
||||
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(duration_constructors_lite)]
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
|
||||
@@ -23,18 +23,15 @@ use common_error::ext::BoxedError;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
|
||||
use common_meta::datanode::TopicStatsReporter;
|
||||
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManager;
|
||||
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::wal_options_allocator::prepare_wal_options;
|
||||
pub use common_procedure::options::ProcedureConfig;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use file_engine::engine::FileRegionEngine;
|
||||
use futures_util::TryStreamExt;
|
||||
use log_store::kafka::log_store::KafkaLogStore;
|
||||
use log_store::kafka::{default_index_file, GlobalIndexCollector};
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
@@ -49,10 +46,8 @@ use query::QueryEngineFactory;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::path_utils::{table_dir, WAL_DIR};
|
||||
use store_api::path_utils::WAL_DIR;
|
||||
use store_api::region_engine::{RegionEngineRef, RegionRole};
|
||||
use store_api::region_request::{PathType, RegionOpenRequest};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
@@ -70,6 +65,7 @@ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::region_server::{DummyTableProviderFactory, RegionServer};
|
||||
use crate::store::{self, new_object_store_without_cache};
|
||||
use crate::utils::{build_region_open_requests, RegionOpenRequests};
|
||||
|
||||
/// Datanode service.
|
||||
pub struct Datanode {
|
||||
@@ -252,16 +248,12 @@ impl DatanodeBuilder {
|
||||
.recovery_mode()
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
|
||||
let table_values = datanode_table_manager
|
||||
.tables(node_id)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
|
||||
let region_open_requests =
|
||||
build_region_open_requests(node_id, self.kv_backend.clone()).await?;
|
||||
let open_all_regions = open_all_regions(
|
||||
region_server.clone(),
|
||||
table_values,
|
||||
region_open_requests,
|
||||
!controlled_by_metasrv,
|
||||
self.opts.init_regions_parallelism,
|
||||
// Ignore nonexistent regions in recovery mode.
|
||||
@@ -342,27 +334,22 @@ impl DatanodeBuilder {
|
||||
async fn initialize_region_server(
|
||||
&self,
|
||||
region_server: &RegionServer,
|
||||
kv_backend: KvBackendRef,
|
||||
open_with_writable: bool,
|
||||
) -> Result<()> {
|
||||
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
|
||||
|
||||
let runtime_switch_manager = RuntimeSwitchManager::new(kv_backend.clone());
|
||||
// TODO(weny): Considering introducing a readonly kv_backend trait.
|
||||
let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
|
||||
let is_recovery_mode = runtime_switch_manager
|
||||
.recovery_mode()
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
|
||||
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
|
||||
let table_values = datanode_table_manager
|
||||
.tables(node_id)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
let region_open_requests =
|
||||
build_region_open_requests(node_id, self.kv_backend.clone()).await?;
|
||||
|
||||
open_all_regions(
|
||||
region_server.clone(),
|
||||
table_values,
|
||||
region_open_requests,
|
||||
open_with_writable,
|
||||
self.opts.init_regions_parallelism,
|
||||
is_recovery_mode,
|
||||
@@ -609,73 +596,24 @@ impl DatanodeBuilder {
|
||||
/// Open all regions belong to this datanode.
|
||||
async fn open_all_regions(
|
||||
region_server: RegionServer,
|
||||
table_values: Vec<DatanodeTableValue>,
|
||||
region_open_requests: RegionOpenRequests,
|
||||
open_with_writable: bool,
|
||||
init_regions_parallelism: usize,
|
||||
ignore_nonexistent_region: bool,
|
||||
) -> Result<()> {
|
||||
let mut regions = vec![];
|
||||
#[cfg(feature = "enterprise")]
|
||||
let mut follower_regions = vec![];
|
||||
for table_value in table_values {
|
||||
for region_number in table_value.regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
|
||||
regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
|
||||
let RegionOpenRequests {
|
||||
leader_regions,
|
||||
#[cfg(feature = "enterprise")]
|
||||
for region_number in table_value.follower_regions {
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
|
||||
follower_regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
}
|
||||
let num_regions = regions.len();
|
||||
info!("going to open {} region(s)", num_regions);
|
||||
|
||||
let mut region_requests = Vec::with_capacity(regions.len());
|
||||
for (region_id, engine, store_path, options) in regions {
|
||||
let table_dir = table_dir(&store_path, region_id.table_id());
|
||||
region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
},
|
||||
));
|
||||
}
|
||||
follower_regions,
|
||||
} = region_open_requests;
|
||||
|
||||
let leader_region_num = leader_regions.len();
|
||||
info!("going to open {} region(s)", leader_region_num);
|
||||
let now = Instant::now();
|
||||
let open_regions = region_server
|
||||
.handle_batch_open_requests(
|
||||
init_regions_parallelism,
|
||||
region_requests,
|
||||
leader_regions,
|
||||
ignore_nonexistent_region,
|
||||
)
|
||||
.await?;
|
||||
@@ -686,19 +624,19 @@ async fn open_all_regions(
|
||||
);
|
||||
if !ignore_nonexistent_region {
|
||||
ensure!(
|
||||
open_regions.len() == num_regions,
|
||||
open_regions.len() == leader_region_num,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Expected to open {} of regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
leader_region_num,
|
||||
open_regions.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
} else if open_regions.len() != num_regions {
|
||||
} else if open_regions.len() != leader_region_num {
|
||||
warn!(
|
||||
"ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
leader_region_num,
|
||||
open_regions.len()
|
||||
);
|
||||
}
|
||||
@@ -717,31 +655,14 @@ async fn open_all_regions(
|
||||
if !follower_regions.is_empty() {
|
||||
use tokio::time::Instant;
|
||||
|
||||
info!(
|
||||
"going to open {} follower region(s)",
|
||||
follower_regions.len()
|
||||
);
|
||||
let mut region_requests = Vec::with_capacity(follower_regions.len());
|
||||
let num_regions = follower_regions.len();
|
||||
for (region_id, engine, store_path, options) in follower_regions {
|
||||
let table_dir = table_dir(&store_path, region_id.table_id());
|
||||
region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: true,
|
||||
},
|
||||
));
|
||||
}
|
||||
let follower_region_num = follower_regions.len();
|
||||
info!("going to open {} follower region(s)", follower_region_num);
|
||||
|
||||
let now = Instant::now();
|
||||
let open_regions = region_server
|
||||
.handle_batch_open_requests(
|
||||
init_regions_parallelism,
|
||||
region_requests,
|
||||
follower_regions,
|
||||
ignore_nonexistent_region,
|
||||
)
|
||||
.await?;
|
||||
@@ -753,19 +674,19 @@ async fn open_all_regions(
|
||||
|
||||
if !ignore_nonexistent_region {
|
||||
ensure!(
|
||||
open_regions.len() == num_regions,
|
||||
open_regions.len() == follower_region_num,
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Expected to open {} of follower regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
follower_region_num,
|
||||
open_regions.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
} else if open_regions.len() != num_regions {
|
||||
} else if open_regions.len() != follower_region_num {
|
||||
warn!(
|
||||
"ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
|
||||
num_regions,
|
||||
follower_region_num,
|
||||
open_regions.len()
|
||||
);
|
||||
}
|
||||
@@ -835,15 +756,13 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
Plugins::default(),
|
||||
kv_backend,
|
||||
kv_backend.clone(),
|
||||
);
|
||||
builder.with_cache_registry(layered_cache_registry);
|
||||
|
||||
let kv = Arc::new(MemoryKvBackend::default()) as _;
|
||||
setup_table_datanode(&kv).await;
|
||||
setup_table_datanode(&(kv_backend as _)).await;
|
||||
|
||||
builder
|
||||
.initialize_region_server(&mock_region_server, kv.clone(), false)
|
||||
.initialize_region_server(&mock_region_server, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::wal_options_allocator::prepare_wal_options;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
|
||||
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint};
|
||||
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
|
||||
@@ -29,17 +29,31 @@ impl HandlerContext {
|
||||
mut region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
replay_entry_id,
|
||||
metadata_replay_entry_id,
|
||||
}: OpenRegion,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
|
||||
(Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint {
|
||||
entry_id: replay_entry_id,
|
||||
metadata_entry_id: Some(metadata_replay_entry_id),
|
||||
}),
|
||||
(Some(replay_entry_id), None) => Some(ReplayCheckpoint {
|
||||
entry_id: replay_entry_id,
|
||||
metadata_entry_id: None,
|
||||
}),
|
||||
_ => None,
|
||||
};
|
||||
let request = RegionRequest::Open(RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||
path_type: PathType::Bare,
|
||||
options: region_options,
|
||||
skip_wal_replay,
|
||||
checkpoint,
|
||||
});
|
||||
let result = self.region_server.handle_request(region_id, request).await;
|
||||
let success = result.is_ok();
|
||||
|
||||
@@ -28,3 +28,4 @@ pub mod service;
|
||||
pub mod store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod tests;
|
||||
pub mod utils;
|
||||
|
||||
@@ -1410,6 +1410,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -1579,6 +1580,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1589,6 +1591,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
},
|
||||
),
|
||||
],
|
||||
@@ -1610,6 +1613,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1620,6 +1624,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
},
|
||||
),
|
||||
],
|
||||
|
||||
188
src/datanode/src/utils.rs
Normal file
188
src/datanode/src/utils.rs
Normal file
@@ -0,0 +1,188 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_meta::key::datanode_table::DatanodeTableManager;
|
||||
use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
|
||||
use common_meta::DatanodeId;
|
||||
use futures::TryStreamExt;
|
||||
use snafu::ResultExt;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use tracing::info;
|
||||
|
||||
use crate::error::{GetMetadataSnafu, Result};
|
||||
|
||||
/// The requests to open regions.
|
||||
pub(crate) struct RegionOpenRequests {
|
||||
pub leader_regions: Vec<(RegionId, RegionOpenRequest)>,
|
||||
#[cfg(feature = "enterprise")]
|
||||
pub follower_regions: Vec<(RegionId, RegionOpenRequest)>,
|
||||
}
|
||||
|
||||
fn group_region_by_topic(
|
||||
region_id: RegionId,
|
||||
region_options: &HashMap<RegionNumber, String>,
|
||||
topic_regions: &mut HashMap<String, Vec<RegionId>>,
|
||||
) {
|
||||
if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
|
||||
topic_regions.entry(topic).or_default().push(region_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_replay_checkpoint(
|
||||
region_id: RegionId,
|
||||
topic_region_values: &Option<HashMap<RegionId, TopicRegionValue>>,
|
||||
) -> Option<ReplayCheckpoint> {
|
||||
let topic_region_values = topic_region_values.as_ref()?;
|
||||
let topic_region_value = topic_region_values.get(®ion_id);
|
||||
let replay_checkpoint = topic_region_value.and_then(|value| value.checkpoint);
|
||||
replay_checkpoint.map(|checkpoint| ReplayCheckpoint {
|
||||
entry_id: checkpoint.entry_id,
|
||||
metadata_entry_id: checkpoint.metadata_entry_id,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn build_region_open_requests(
|
||||
node_id: DatanodeId,
|
||||
kv_backend: KvBackendRef,
|
||||
) -> Result<RegionOpenRequests> {
|
||||
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
|
||||
let table_values = datanode_table_manager
|
||||
.tables(node_id)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
|
||||
let topic_region_manager = TopicRegionManager::new(kv_backend);
|
||||
let mut topic_regions = HashMap::<String, Vec<RegionId>>::new();
|
||||
let mut regions = vec![];
|
||||
#[cfg(feature = "enterprise")]
|
||||
let mut follower_regions = vec![];
|
||||
|
||||
for table_value in table_values {
|
||||
for region_number in table_value.regions {
|
||||
let region_id = RegionId::new(table_value.table_id, region_number);
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
region_id,
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
group_region_by_topic(
|
||||
region_id,
|
||||
&table_value.region_info.region_wal_options,
|
||||
&mut topic_regions,
|
||||
);
|
||||
|
||||
regions.push((
|
||||
region_id,
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
for region_number in table_value.follower_regions {
|
||||
let region_id = RegionId::new(table_value.table_id, region_number);
|
||||
// Augments region options with wal options if a wal options is provided.
|
||||
let mut region_options = table_value.region_info.region_options.clone();
|
||||
prepare_wal_options(
|
||||
&mut region_options,
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
&table_value.region_info.region_wal_options,
|
||||
);
|
||||
group_region_by_topic(
|
||||
region_id,
|
||||
&table_value.region_info.region_wal_options,
|
||||
&mut topic_regions,
|
||||
);
|
||||
|
||||
follower_regions.push((
|
||||
RegionId::new(table_value.table_id, region_number),
|
||||
table_value.region_info.engine.clone(),
|
||||
table_value.region_info.region_storage_path.clone(),
|
||||
region_options,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let topic_region_values = if !topic_regions.is_empty() {
|
||||
let keys = topic_regions
|
||||
.iter()
|
||||
.flat_map(|(topic, regions)| {
|
||||
regions
|
||||
.iter()
|
||||
.map(|region_id| TopicRegionKey::new(*region_id, topic))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let topic_region_manager = topic_region_manager
|
||||
.batch_get(keys)
|
||||
.await
|
||||
.context(GetMetadataSnafu)?;
|
||||
Some(topic_region_manager)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut leader_region_requests = Vec::with_capacity(regions.len());
|
||||
for (region_id, engine, store_path, options) in regions {
|
||||
let table_dir = table_dir(&store_path, region_id.table_id());
|
||||
let checkpoint = get_replay_checkpoint(region_id, &topic_region_values);
|
||||
info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
|
||||
leader_region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint,
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
let follower_region_requests = {
|
||||
let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
|
||||
for (region_id, engine, store_path, options) in follower_regions {
|
||||
let table_dir = table_dir(&store_path, region_id.table_id());
|
||||
follower_region_requests.push((
|
||||
region_id,
|
||||
RegionOpenRequest {
|
||||
engine,
|
||||
table_dir,
|
||||
path_type: PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: true,
|
||||
checkpoint: None,
|
||||
},
|
||||
));
|
||||
}
|
||||
follower_region_requests
|
||||
};
|
||||
|
||||
Ok(RegionOpenRequests {
|
||||
leader_regions: leader_region_requests,
|
||||
#[cfg(feature = "enterprise")]
|
||||
follower_regions: follower_region_requests,
|
||||
})
|
||||
}
|
||||
@@ -178,6 +178,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
|
||||
let region = FileRegion::open(region_id, request, &object_store)
|
||||
@@ -230,6 +231,7 @@ mod tests {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
let err = FileRegion::open(region_id, request, &object_store)
|
||||
.await
|
||||
|
||||
@@ -28,6 +28,17 @@ use rskafka::record::RecordAndOffset;
|
||||
|
||||
use crate::kafka::index::{NextBatchHint, RegionWalIndexIterator};
|
||||
|
||||
pub struct FetchResult {
|
||||
/// The offsets of the fetched records.
|
||||
pub records: Vec<RecordAndOffset>,
|
||||
|
||||
/// The high watermark of the partition.
|
||||
pub high_watermark: i64,
|
||||
|
||||
/// The size of the response encoded in bytes.
|
||||
pub encoded_response_size: usize,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait FetchClient: std::fmt::Debug + Send + Sync {
|
||||
/// Fetch records.
|
||||
@@ -38,7 +49,9 @@ pub trait FetchClient: std::fmt::Debug + Send + Sync {
|
||||
offset: i64,
|
||||
bytes: Range<i32>,
|
||||
max_wait_ms: i32,
|
||||
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)>;
|
||||
) -> rskafka::client::error::Result<FetchResult>;
|
||||
|
||||
fn topic(&self) -> &str;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -48,15 +61,25 @@ impl FetchClient for PartitionClient {
|
||||
offset: i64,
|
||||
bytes: Range<i32>,
|
||||
max_wait_ms: i32,
|
||||
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)> {
|
||||
self.fetch_records(offset, bytes, max_wait_ms).await
|
||||
) -> rskafka::client::error::Result<FetchResult> {
|
||||
self.fetch_records(offset, bytes, max_wait_ms)
|
||||
.await
|
||||
.map(|r| FetchResult {
|
||||
records: r.records,
|
||||
high_watermark: r.high_watermark,
|
||||
encoded_response_size: r.encoded_response_size,
|
||||
})
|
||||
}
|
||||
|
||||
fn topic(&self) -> &str {
|
||||
self.topic()
|
||||
}
|
||||
}
|
||||
|
||||
struct FetchResult {
|
||||
struct FetchResultInner {
|
||||
records_and_offsets: Vec<RecordAndOffset>,
|
||||
batch_size: usize,
|
||||
fetch_bytes: i32,
|
||||
fetch_bytes: usize,
|
||||
watermark: i64,
|
||||
used_offset: i64,
|
||||
}
|
||||
@@ -97,7 +120,23 @@ pub struct Consumer {
|
||||
|
||||
/// The fetch future.
|
||||
#[builder(default = "Fuse::terminated()")]
|
||||
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
|
||||
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResultInner>>>,
|
||||
|
||||
/// Total fetched bytes.
|
||||
#[builder(default = "0")]
|
||||
total_fetched_bytes: u64,
|
||||
}
|
||||
|
||||
impl Consumer {
|
||||
/// Returns the total fetched bytes.
|
||||
pub fn total_fetched_bytes(&self) -> u64 {
|
||||
self.total_fetched_bytes
|
||||
}
|
||||
|
||||
/// Returns the topic name.
|
||||
pub fn topic(&self) -> &str {
|
||||
self.client.topic()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RecordsBuffer {
|
||||
@@ -184,15 +223,20 @@ impl Stream for Consumer {
|
||||
let fetch_range =
|
||||
1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
|
||||
*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
|
||||
let (records_and_offsets, watermark) = client
|
||||
let FetchResult {
|
||||
records: records_and_offsets,
|
||||
high_watermark: watermark,
|
||||
encoded_response_size,
|
||||
..
|
||||
} = client
|
||||
.fetch_records(offset, fetch_range, max_wait_ms)
|
||||
.await?;
|
||||
|
||||
Ok(FetchResult {
|
||||
Ok(FetchResultInner {
|
||||
records_and_offsets,
|
||||
watermark,
|
||||
used_offset: offset,
|
||||
fetch_bytes: bytes as i32,
|
||||
fetch_bytes: encoded_response_size,
|
||||
batch_size: len,
|
||||
})
|
||||
}));
|
||||
@@ -206,7 +250,7 @@ impl Stream for Consumer {
|
||||
let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
|
||||
|
||||
match data {
|
||||
Ok(FetchResult {
|
||||
Ok(FetchResultInner {
|
||||
mut records_and_offsets,
|
||||
watermark,
|
||||
used_offset,
|
||||
@@ -217,9 +261,10 @@ impl Stream for Consumer {
|
||||
records_and_offsets.sort_unstable_by_key(|x| x.offset);
|
||||
*this.last_high_watermark = watermark;
|
||||
if !records_and_offsets.is_empty() {
|
||||
*this.avg_record_size = fetch_bytes as usize / records_and_offsets.len();
|
||||
*this.avg_record_size = fetch_bytes / records_and_offsets.len();
|
||||
debug!("set avg_record_size: {}", *this.avg_record_size);
|
||||
}
|
||||
*this.total_fetched_bytes += fetch_bytes as u64;
|
||||
|
||||
debug!(
|
||||
"Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
|
||||
@@ -254,7 +299,7 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use rskafka::record::{Record, RecordAndOffset};
|
||||
|
||||
use super::FetchClient;
|
||||
use super::*;
|
||||
use crate::kafka::consumer::{Consumer, RecordsBuffer};
|
||||
use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
|
||||
|
||||
@@ -270,7 +315,7 @@ mod tests {
|
||||
offset: i64,
|
||||
bytes: Range<i32>,
|
||||
_max_wait_ms: i32,
|
||||
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)> {
|
||||
) -> rskafka::client::error::Result<FetchResult> {
|
||||
let record_size = self.record.approximate_size();
|
||||
let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
|
||||
|
||||
@@ -280,8 +325,18 @@ mod tests {
|
||||
offset: offset + idx as i64,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_offset = offset + records.len() as i64;
|
||||
Ok((records, max_offset))
|
||||
let encoded_response_size = records.iter().map(|r| r.record.approximate_size()).sum();
|
||||
Ok(FetchResult {
|
||||
records,
|
||||
high_watermark: max_offset,
|
||||
encoded_response_size,
|
||||
})
|
||||
}
|
||||
|
||||
fn topic(&self) -> &str {
|
||||
"test"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,6 +370,7 @@ mod tests {
|
||||
index: Box::new(index),
|
||||
},
|
||||
fetch_fut: Fuse::terminated(),
|
||||
total_fetched_bytes: 0,
|
||||
};
|
||||
|
||||
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
|
||||
@@ -347,6 +403,7 @@ mod tests {
|
||||
index: Box::new(index),
|
||||
},
|
||||
fetch_fut: Fuse::terminated(),
|
||||
total_fetched_bytes: 0,
|
||||
};
|
||||
|
||||
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
|
||||
@@ -388,6 +445,7 @@ mod tests {
|
||||
index: Box::new(iter),
|
||||
},
|
||||
fetch_fut: Fuse::terminated(),
|
||||
total_fetched_bytes: 0,
|
||||
};
|
||||
|
||||
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
|
||||
|
||||
@@ -14,11 +14,12 @@
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_meta::datanode::TopicStatsReporter;
|
||||
use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
|
||||
use common_telemetry::{debug, warn};
|
||||
use common_telemetry::{debug, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
use dashmap::DashMap;
|
||||
@@ -400,6 +401,7 @@ impl LogStore for KafkaLogStore {
|
||||
let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
|
||||
let provider = provider.clone();
|
||||
let stream = async_stream::stream!({
|
||||
let now = Instant::now();
|
||||
while let Some(consume_result) = stream_consumer.next().await {
|
||||
// Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset.
|
||||
// The `RecordAndOffset` contains the record data and its start offset.
|
||||
@@ -410,9 +412,6 @@ impl LogStore for KafkaLogStore {
|
||||
})?;
|
||||
let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
|
||||
|
||||
metrics::METRIC_KAFKA_READ_BYTES_TOTAL
|
||||
.inc_by(kafka_record.approximate_size() as u64);
|
||||
|
||||
debug!(
|
||||
"Read a record at offset {} for topic {}, high watermark: {}",
|
||||
offset, provider.topic, high_watermark
|
||||
@@ -446,6 +445,17 @@ impl LogStore for KafkaLogStore {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
metrics::METRIC_KAFKA_READ_BYTES_TOTAL.inc_by(stream_consumer.total_fetched_bytes());
|
||||
|
||||
info!(
|
||||
"Fetched {} bytes from topic: {}, start_entry_id: {}, end_offset: {}, elapsed: {:?}",
|
||||
ReadableSize(stream_consumer.total_fetched_bytes()),
|
||||
stream_consumer.topic(),
|
||||
entry_id,
|
||||
end_offset,
|
||||
now.elapsed()
|
||||
);
|
||||
});
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
@@ -422,6 +422,7 @@ impl MetasrvBuilder {
|
||||
mailbox.clone(),
|
||||
options.grpc.server_addr.clone(),
|
||||
remote_wal_options.flush_trigger_size,
|
||||
remote_wal_options.checkpoint_trigger_size,
|
||||
);
|
||||
region_flush_trigger.try_start()?;
|
||||
|
||||
|
||||
@@ -82,5 +82,12 @@ lazy_static! {
|
||||
.unwrap();
|
||||
/// The triggered region flush total counter.
|
||||
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec!("meta_triggered_region_flushes_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
|
||||
register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
|
||||
|
||||
/// The triggered region checkpoint total counter.
|
||||
pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec!("meta_triggered_region_checkpoint_total", "meta triggered region checkpoint total", &["topic_name"]).unwrap();
|
||||
/// The topic estimated replay size.
|
||||
pub static ref METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE: IntGaugeVec =
|
||||
register_int_gauge_vec!("meta_topic_estimated_replay_size", "meta topic estimated replay size", &["topic_name"]).unwrap();
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::ResettableKvBackendRef;
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
|
||||
@@ -534,6 +535,20 @@ impl Context {
|
||||
Ok(datanode_value.as_ref().unwrap())
|
||||
}
|
||||
|
||||
/// Fetches the replay checkpoint for the given topic.
|
||||
pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result<Option<ReplayCheckpoint>> {
|
||||
let region_id = self.region_id();
|
||||
let topic_region_key = TopicRegionKey::new(region_id, topic);
|
||||
let value = self
|
||||
.table_metadata_manager
|
||||
.topic_region_manager()
|
||||
.get(topic_region_key)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
Ok(value.and_then(|value| value.checkpoint))
|
||||
}
|
||||
|
||||
/// Returns the [RegionId].
|
||||
pub fn region_id(&self) -> RegionId {
|
||||
self.persistent_ctx.region_id
|
||||
|
||||
@@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
|
||||
use common_meta::RegionIdent;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::info;
|
||||
@@ -67,6 +68,7 @@ impl OpenCandidateRegion {
|
||||
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
|
||||
let pc = &ctx.persistent_ctx;
|
||||
let table_id = pc.region_id.table_id();
|
||||
let region_id = pc.region_id;
|
||||
let region_number = pc.region_id.region_number();
|
||||
let candidate_id = pc.to_peer.id;
|
||||
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
|
||||
@@ -78,18 +80,31 @@ impl OpenCandidateRegion {
|
||||
engine,
|
||||
} = datanode_table_value.region_info.clone();
|
||||
|
||||
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
|
||||
RegionIdent {
|
||||
datanode_id: candidate_id,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
},
|
||||
®ion_storage_path,
|
||||
region_options,
|
||||
region_wal_options,
|
||||
true,
|
||||
));
|
||||
let checkpoint =
|
||||
if let Some(topic) = extract_topic_from_wal_options(region_id, ®ion_wal_options) {
|
||||
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let open_instruction = Instruction::OpenRegion(
|
||||
OpenRegion::new(
|
||||
RegionIdent {
|
||||
datanode_id: candidate_id,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
},
|
||||
®ion_storage_path,
|
||||
region_options,
|
||||
region_wal_options,
|
||||
true,
|
||||
)
|
||||
.with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id))
|
||||
.with_metadata_replay_entry_id(
|
||||
checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id),
|
||||
),
|
||||
);
|
||||
|
||||
Ok(open_instruction)
|
||||
}
|
||||
@@ -226,6 +241,8 @@ mod tests {
|
||||
region_options: Default::default(),
|
||||
region_wal_options: Default::default(),
|
||||
skip_wal_replay: true,
|
||||
replay_entry_id: None,
|
||||
metadata_replay_entry_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ mod tests {
|
||||
|
||||
use common_wal::maybe_skip_kafka_integration_test;
|
||||
use common_wal::test_util::get_kafka_endpoints;
|
||||
use rskafka::client::partition::UnknownTopicHandling;
|
||||
use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
|
||||
use rskafka::record::Record;
|
||||
|
||||
use super::*;
|
||||
@@ -289,8 +289,8 @@ mod tests {
|
||||
.await;
|
||||
if expect_success {
|
||||
assert!(res.is_ok());
|
||||
let (record, _high_watermark) = res.unwrap();
|
||||
assert!(!record.is_empty());
|
||||
let FetchResult { records, .. } = res.unwrap();
|
||||
assert!(!records.is_empty());
|
||||
} else {
|
||||
let err = res.unwrap_err();
|
||||
// The error is in a private module so we check it through `to_string()`.
|
||||
|
||||
@@ -62,7 +62,9 @@ pub(crate) async fn find_pruneable_entry_id_for_topic(
|
||||
.topic_region_manager()
|
||||
.regions(topic)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.into_keys()
|
||||
.collect::<Vec<_>>();
|
||||
if region_ids.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -19,12 +19,16 @@ use std::time::{Duration, Instant};
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_meta::instruction::{FlushRegions, Instruction};
|
||||
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
|
||||
use common_meta::stats::topic::TopicStatsRegistryRef;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use common_wal::config::kafka::common::{
|
||||
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
@@ -77,6 +81,8 @@ pub struct RegionFlushTrigger {
|
||||
server_addr: String,
|
||||
/// The flush trigger size.
|
||||
flush_trigger_size: ReadableSize,
|
||||
/// The checkpoint trigger size.
|
||||
checkpoint_trigger_size: ReadableSize,
|
||||
/// The receiver of events.
|
||||
receiver: Receiver<Event>,
|
||||
}
|
||||
@@ -89,8 +95,23 @@ impl RegionFlushTrigger {
|
||||
topic_stats_registry: TopicStatsRegistryRef,
|
||||
mailbox: MailboxRef,
|
||||
server_addr: String,
|
||||
flush_trigger_size: ReadableSize,
|
||||
mut flush_trigger_size: ReadableSize,
|
||||
mut checkpoint_trigger_size: ReadableSize,
|
||||
) -> (Self, RegionFlushTicker) {
|
||||
if flush_trigger_size.as_bytes() == 0 {
|
||||
flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
|
||||
warn!(
|
||||
"flush_trigger_size is not set, using default value: {}",
|
||||
flush_trigger_size
|
||||
);
|
||||
}
|
||||
if checkpoint_trigger_size.as_bytes() == 0 {
|
||||
checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
|
||||
warn!(
|
||||
"checkpoint_trigger_size is not set, using default value: {}",
|
||||
checkpoint_trigger_size
|
||||
);
|
||||
}
|
||||
let (tx, rx) = Self::channel();
|
||||
let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
|
||||
let region_flush_trigger = Self {
|
||||
@@ -100,6 +121,7 @@ impl RegionFlushTrigger {
|
||||
mailbox,
|
||||
server_addr,
|
||||
flush_trigger_size,
|
||||
checkpoint_trigger_size,
|
||||
receiver: rx,
|
||||
};
|
||||
(region_flush_trigger, region_flush_ticker)
|
||||
@@ -197,6 +219,52 @@ impl RegionFlushTrigger {
|
||||
Some((latest_entry_id, stat.avg_record_size))
|
||||
}
|
||||
|
||||
async fn persist_region_checkpoints(
|
||||
&self,
|
||||
topic: &str,
|
||||
region_ids: &[RegionId],
|
||||
leader_regions: &HashMap<RegionId, LeaderRegion>,
|
||||
) -> Result<()> {
|
||||
if region_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let regions = region_ids
|
||||
.iter()
|
||||
.flat_map(|region_id| match leader_regions.get(region_id) {
|
||||
Some(leader_region) => {
|
||||
let entry_id = leader_region.manifest.replay_entry_id();
|
||||
let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id();
|
||||
|
||||
Some((
|
||||
TopicRegionKey::new(*region_id, topic),
|
||||
Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(
|
||||
entry_id,
|
||||
metadata_entry_id,
|
||||
)))),
|
||||
))
|
||||
}
|
||||
None => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
|
||||
let batch_size = max_txn_ops.min(regions.len());
|
||||
for batch in regions.chunks(batch_size) {
|
||||
self.table_metadata_manager
|
||||
.topic_region_manager()
|
||||
.batch_put(batch)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
}
|
||||
|
||||
metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
|
||||
.with_label_values(&[topic])
|
||||
.inc_by(regions.len() as u64);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_regions_in_topic(
|
||||
&self,
|
||||
topic: &str,
|
||||
@@ -209,14 +277,34 @@ impl RegionFlushTrigger {
|
||||
.regions(topic)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
if region_ids.is_empty() {
|
||||
debug!("No regions found for topic: {}", topic);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = self
|
||||
// Filters regions need to persist checkpoints.
|
||||
let regions_to_persist = filter_regions_by_replay_size(
|
||||
topic,
|
||||
region_ids
|
||||
.iter()
|
||||
.map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
|
||||
avg_record_size as u64,
|
||||
latest_entry_id,
|
||||
self.checkpoint_trigger_size,
|
||||
);
|
||||
let region_manifests = self
|
||||
.leader_region_registry
|
||||
.batch_get(region_ids.iter().cloned())
|
||||
.batch_get(region_ids.keys().cloned());
|
||||
|
||||
if let Err(err) = self
|
||||
.persist_region_checkpoints(topic, ®ions_to_persist, ®ion_manifests)
|
||||
.await
|
||||
{
|
||||
error!(err; "Failed to persist region checkpoints for topic: {}", topic);
|
||||
}
|
||||
|
||||
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests
|
||||
.into_iter()
|
||||
.partition_map(|(region_id, region)| {
|
||||
if !region.manifest.is_inactive() {
|
||||
@@ -226,8 +314,24 @@ impl RegionFlushTrigger {
|
||||
}
|
||||
});
|
||||
|
||||
let min_entry_id = inactive_regions
|
||||
.iter()
|
||||
.min_by_key(|(_, entry_id)| *entry_id);
|
||||
let min_entry_id = active_regions
|
||||
.iter()
|
||||
.min_by_key(|(_, entry_id)| *entry_id)
|
||||
.or(min_entry_id);
|
||||
|
||||
if let Some((_, min_entry_id)) = min_entry_id {
|
||||
let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
|
||||
.saturating_mul(avg_record_size as u64);
|
||||
metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
|
||||
.with_label_values(&[topic])
|
||||
.set(replay_size as i64);
|
||||
}
|
||||
|
||||
// Selects regions to flush from the set of active regions.
|
||||
let mut regions_to_flush = select_regions_to_flush(
|
||||
let mut regions_to_flush = filter_regions_by_replay_size(
|
||||
topic,
|
||||
active_regions.into_iter(),
|
||||
avg_record_size as u64,
|
||||
@@ -239,7 +343,7 @@ impl RegionFlushTrigger {
|
||||
// Selects regions to flush from the set of inactive regions.
|
||||
// For inactive regions, we use a lower flush trigger size (half of the normal size)
|
||||
// to encourage more aggressive flushing to update the region's topic latest entry id.
|
||||
let inactive_regions_to_flush = select_regions_to_flush(
|
||||
let inactive_regions_to_flush = filter_regions_by_replay_size(
|
||||
topic,
|
||||
inactive_regions.into_iter(),
|
||||
avg_record_size as u64,
|
||||
@@ -304,26 +408,26 @@ impl RegionFlushTrigger {
|
||||
}
|
||||
}
|
||||
|
||||
/// Select regions to flush based on the estimated replay size.
|
||||
/// Filter regions based on the estimated replay size.
|
||||
///
|
||||
/// The regions are selected if the estimated replay size exceeds the flush trigger size.
|
||||
/// Returns the regions if its estimated replay size exceeds the given threshold.
|
||||
/// The estimated replay size is calculated as:
|
||||
/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
|
||||
fn select_regions_to_flush<I: Iterator<Item = (RegionId, u64)>>(
|
||||
fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
|
||||
topic: &str,
|
||||
regions: I,
|
||||
avg_record_size: u64,
|
||||
latest_entry_id: u64,
|
||||
flush_trigger_size: ReadableSize,
|
||||
threshold: ReadableSize,
|
||||
) -> Vec<RegionId> {
|
||||
let mut regions_to_flush = Vec::new();
|
||||
for (region_id, prunable_entry_id) in regions {
|
||||
if prunable_entry_id < latest_entry_id {
|
||||
let replay_size = (latest_entry_id - prunable_entry_id).saturating_mul(avg_record_size);
|
||||
if replay_size > flush_trigger_size.as_bytes() {
|
||||
for (region_id, entry_id) in regions {
|
||||
if entry_id < latest_entry_id {
|
||||
let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
|
||||
if replay_size > threshold.as_bytes() {
|
||||
debug!(
|
||||
"Region {}: estimated replay size {} exceeds flush trigger size {}, prunable entry id: {}, topic latest entry id: {}, topic: '{}'",
|
||||
region_id, ReadableSize(replay_size), flush_trigger_size, prunable_entry_id, latest_entry_id, topic
|
||||
"Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
|
||||
region_id, ReadableSize(replay_size), threshold, entry_id, latest_entry_id, topic
|
||||
);
|
||||
regions_to_flush.push(region_id);
|
||||
}
|
||||
@@ -421,7 +525,7 @@ mod tests {
|
||||
(region_id(1, 3), 95), // replay_size = 50
|
||||
];
|
||||
|
||||
let result = select_regions_to_flush(
|
||||
let result = filter_regions_by_replay_size(
|
||||
topic,
|
||||
regions.into_iter(),
|
||||
avg_record_size,
|
||||
@@ -445,7 +549,7 @@ mod tests {
|
||||
(region_id(1, 3), 90), // replay_size = 100
|
||||
];
|
||||
|
||||
let result = select_regions_to_flush(
|
||||
let result = filter_regions_by_replay_size(
|
||||
topic,
|
||||
regions.into_iter(),
|
||||
avg_record_size,
|
||||
@@ -465,7 +569,7 @@ mod tests {
|
||||
let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
|
||||
|
||||
// replay_size will always be 0, so none should be flushed
|
||||
let result = select_regions_to_flush(
|
||||
let result = filter_regions_by_replay_size(
|
||||
topic,
|
||||
regions.into_iter(),
|
||||
avg_record_size,
|
||||
@@ -487,7 +591,7 @@ mod tests {
|
||||
(region_id(1, 2), 99), // replay_size = 10
|
||||
];
|
||||
|
||||
let result = select_regions_to_flush(
|
||||
let result = filter_regions_by_replay_size(
|
||||
topic,
|
||||
regions.into_iter(),
|
||||
avg_record_size,
|
||||
@@ -512,7 +616,7 @@ mod tests {
|
||||
(region_id(1, 4), 200), // replay_size = 0
|
||||
];
|
||||
|
||||
let result = select_regions_to_flush(
|
||||
let result = filter_regions_by_replay_size(
|
||||
topic,
|
||||
regions.into_iter(),
|
||||
avg_record_size,
|
||||
|
||||
@@ -519,6 +519,7 @@ mod test {
|
||||
path_type: PathType::Bare, // Use Bare path type for engine regions
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
engine
|
||||
.handle_request(physical_region_id, RegionRequest::Open(open_request))
|
||||
@@ -542,6 +543,7 @@ mod test {
|
||||
path_type: PathType::Bare, // Use Bare path type for engine regions
|
||||
options: HashMap::new(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
engine
|
||||
.handle_request(
|
||||
@@ -620,6 +622,7 @@ mod test {
|
||||
path_type: PathType::Bare,
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
// Opening an already opened region should succeed.
|
||||
// Since the region is already open, no metadata recovery operations will be performed.
|
||||
@@ -647,6 +650,7 @@ mod test {
|
||||
path_type: PathType::Bare,
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
};
|
||||
let err = metric_engine
|
||||
.handle_request(physical_region_id, RegionRequest::Open(open_request))
|
||||
|
||||
@@ -22,7 +22,7 @@ use datafusion::common::HashMap;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::region_engine::{BatchResponses, RegionEngine};
|
||||
use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, RegionRequest};
|
||||
use store_api::region_request::{AffectedRows, PathType, RegionOpenRequest, ReplayCheckpoint};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::create::region_options_for_metadata_region;
|
||||
@@ -204,12 +204,18 @@ impl MetricEngineInner {
|
||||
request: RegionOpenRequest,
|
||||
) -> (RegionOpenRequest, RegionOpenRequest) {
|
||||
let metadata_region_options = region_options_for_metadata_region(&request.options);
|
||||
let checkpoint = request.checkpoint;
|
||||
|
||||
let open_metadata_region_request = RegionOpenRequest {
|
||||
table_dir: request.table_dir.clone(),
|
||||
path_type: PathType::Metadata,
|
||||
options: metadata_region_options,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
skip_wal_replay: request.skip_wal_replay,
|
||||
checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
|
||||
entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
|
||||
metadata_entry_id: None,
|
||||
}),
|
||||
};
|
||||
|
||||
let mut data_region_options = request.options;
|
||||
@@ -223,6 +229,10 @@ impl MetricEngineInner {
|
||||
options: data_region_options,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
skip_wal_replay: request.skip_wal_replay,
|
||||
checkpoint: checkpoint.map(|checkpoint| ReplayCheckpoint {
|
||||
entry_id: checkpoint.entry_id,
|
||||
metadata_entry_id: None,
|
||||
}),
|
||||
};
|
||||
|
||||
(open_metadata_region_request, open_data_region_request)
|
||||
@@ -238,25 +248,17 @@ impl MetricEngineInner {
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let (open_metadata_region_request, open_data_region_request) =
|
||||
self.transform_open_physical_region_request(request);
|
||||
|
||||
self.mito
|
||||
.handle_request(
|
||||
metadata_region_id,
|
||||
RegionRequest::Open(open_metadata_region_request),
|
||||
let _ = self
|
||||
.mito
|
||||
.handle_batch_open_requests(
|
||||
2,
|
||||
vec![
|
||||
(metadata_region_id, open_metadata_region_request),
|
||||
(data_region_id, open_data_region_request),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.with_context(|_| OpenMitoRegionSnafu {
|
||||
region_type: "metadata",
|
||||
})?;
|
||||
self.mito
|
||||
.handle_request(
|
||||
data_region_id,
|
||||
RegionRequest::Open(open_data_region_request),
|
||||
)
|
||||
.await
|
||||
.with_context(|_| OpenMitoRegionSnafu {
|
||||
region_type: "data",
|
||||
})?;
|
||||
.context(BatchOpenMitoRegionSnafu {})?;
|
||||
|
||||
info!("Opened physical metric region {region_id}");
|
||||
PHYSICAL_REGION_COUNT.inc();
|
||||
|
||||
@@ -115,6 +115,7 @@ impl TestEnv {
|
||||
path_type: PathType::Bare, // Use Bare path type for engine regions
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: true,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -207,6 +207,7 @@ async fn test_alter_region() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -293,6 +294,7 @@ async fn test_put_after_alter() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -611,6 +613,7 @@ async fn test_alter_column_fulltext_options() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -722,6 +725,7 @@ async fn test_alter_column_set_inverted_index() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -158,6 +158,7 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
|
||||
path_type: store_api::region_request::PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -125,6 +125,7 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
options: options.clone(),
|
||||
skip_wal_replay: false,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
},
|
||||
)
|
||||
})
|
||||
@@ -137,6 +138,7 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
options: options.clone(),
|
||||
skip_wal_replay: false,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
},
|
||||
));
|
||||
|
||||
@@ -190,6 +192,7 @@ async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
|
||||
options: options.clone(),
|
||||
skip_wal_replay: false,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
@@ -95,6 +95,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
|
||||
path_type: store_api::region_request::PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -216,6 +217,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
|
||||
path_type: store_api::region_request::PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -320,6 +322,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
|
||||
path_type: store_api::region_request::PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -425,6 +428,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
|
||||
path_type: store_api::region_request::PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -532,6 +536,7 @@ async fn open_region(
|
||||
options: HashMap::new(),
|
||||
skip_wal_replay,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -626,6 +631,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
|
||||
options: HashMap::new(),
|
||||
skip_wal_replay: true,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -792,6 +792,7 @@ async fn test_change_region_compaction_window() {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -875,6 +876,7 @@ async fn test_open_overwrite_compaction_window() {
|
||||
path_type: PathType::Bare,
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -50,6 +50,7 @@ async fn test_engine_open_empty() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -85,6 +86,7 @@ async fn test_engine_open_existing() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -180,6 +182,7 @@ async fn test_engine_region_open_with_options() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -226,6 +229,7 @@ async fn test_engine_region_open_with_custom_store() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -298,6 +302,7 @@ async fn test_open_region_skip_wal_replay() {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: true,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -328,6 +333,7 @@ async fn test_open_region_skip_wal_replay() {
|
||||
path_type: PathType::Bare,
|
||||
options: Default::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -370,6 +376,7 @@ async fn test_open_region_wait_for_opening_region_ok() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -410,6 +417,7 @@ async fn test_open_region_wait_for_opening_region_err() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -51,6 +51,7 @@ async fn scan_in_parallel(
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -111,6 +111,7 @@ async fn test_sync_after_flush_region() {
|
||||
options: Default::default(),
|
||||
// Ensure the region is not replayed from the WAL.
|
||||
skip_wal_replay: true,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -207,6 +208,7 @@ async fn test_sync_after_alter_region() {
|
||||
options: Default::default(),
|
||||
// Ensure the region is not replayed from the WAL.
|
||||
skip_wal_replay: true,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -276,6 +276,7 @@ async fn test_engine_truncate_reopen() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -384,6 +385,7 @@ async fn test_engine_truncate_during_flush() {
|
||||
path_type: PathType::Bare,
|
||||
options: HashMap::default(),
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -85,6 +85,7 @@ pub(crate) struct RegionOpener {
|
||||
time_provider: TimeProviderRef,
|
||||
stats: ManifestStats,
|
||||
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
|
||||
replay_checkpoint: Option<u64>,
|
||||
}
|
||||
|
||||
impl RegionOpener {
|
||||
@@ -118,6 +119,7 @@ impl RegionOpener {
|
||||
time_provider,
|
||||
stats: Default::default(),
|
||||
wal_entry_reader: None,
|
||||
replay_checkpoint: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,6 +151,12 @@ impl RegionOpener {
|
||||
self.options(RegionOptions::try_from(&options)?)
|
||||
}
|
||||
|
||||
/// Sets the replay checkpoint for the region.
|
||||
pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
|
||||
self.replay_checkpoint = replay_checkpoint;
|
||||
self
|
||||
}
|
||||
|
||||
/// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
|
||||
/// constructing a new one from scratch.
|
||||
pub(crate) fn wal_entry_reader(
|
||||
@@ -432,17 +440,22 @@ impl RegionOpener {
|
||||
let flushed_entry_id = version.flushed_entry_id;
|
||||
let version_control = Arc::new(VersionControl::new(version));
|
||||
if !self.skip_wal_replay {
|
||||
let replay_from_entry_id = self
|
||||
.replay_checkpoint
|
||||
.unwrap_or_default()
|
||||
.max(flushed_entry_id);
|
||||
info!(
|
||||
"Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
|
||||
flushed_entry_id + 1,
|
||||
"Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
|
||||
replay_from_entry_id,
|
||||
region_id,
|
||||
manifest.manifest_version
|
||||
manifest.manifest_version,
|
||||
flushed_entry_id
|
||||
);
|
||||
replay_memtable(
|
||||
&provider,
|
||||
wal_entry_reader,
|
||||
region_id,
|
||||
flushed_entry_id,
|
||||
replay_from_entry_id,
|
||||
&version_control,
|
||||
config.allow_stale_entries,
|
||||
on_region_opened,
|
||||
|
||||
@@ -1100,6 +1100,7 @@ pub async fn reopen_region(
|
||||
options,
|
||||
skip_wal_replay: false,
|
||||
path_type: PathType::Bare,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -107,6 +107,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
.skip_wal_replay(request.skip_wal_replay)
|
||||
.cache(Some(self.cache_manager.clone()))
|
||||
.wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
|
||||
.replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
|
||||
.parse_options(request.options)
|
||||
{
|
||||
Ok(opener) => opener,
|
||||
|
||||
@@ -291,6 +291,7 @@ fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>>
|
||||
path_type: PathType::Bare,
|
||||
options: open.options,
|
||||
skip_wal_replay: false,
|
||||
checkpoint: None,
|
||||
}),
|
||||
)])
|
||||
}
|
||||
@@ -503,6 +504,14 @@ pub struct RegionOpenRequest {
|
||||
pub options: HashMap<String, String>,
|
||||
/// To skip replaying the WAL.
|
||||
pub skip_wal_replay: bool,
|
||||
/// Replay checkpoint.
|
||||
pub checkpoint: Option<ReplayCheckpoint>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ReplayCheckpoint {
|
||||
pub entry_id: u64,
|
||||
pub metadata_entry_id: Option<u64>,
|
||||
}
|
||||
|
||||
impl RegionOpenRequest {
|
||||
|
||||
Reference in New Issue
Block a user