refactor: simplify WAL pruning procedure and introduce region flush trigger (#6741)

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: update wal config for metasrv

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce region flush trigger

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: debug assert

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: log level

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: simplify wal prune procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade rskafka

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: always flush inactive regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: refactor flush trigger

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove unused code

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: typo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add metrics

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-14 22:15:30 +08:00
committed by GitHub
parent 2a3e4c7a82
commit 021ad09c21
28 changed files with 940 additions and 394 deletions

2
Cargo.lock generated
View File

@@ -10870,7 +10870,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/WenyXu/rskafka.git?rev=bc582e98918def613a882581a1b9331d186d9b2d#bc582e98918def613a882581a1b9331d186d9b2d"
source = "git+https://github.com/influxdata/rskafka.git?rev=a62120b6c74d68953464b256f858dc1c41a903b4#a62120b6c74d68953464b256f858dc1c41a903b4"
dependencies = [
"bytes",
"chrono",

View File

@@ -188,7 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "bc582e98918def613a882581a1b9331d186d9b2d", features = [
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "a62120b6c74d68953464b256f858dc1c41a903b4", features = [
"transport-tls",
] }
rstest = "0.25"

View File

@@ -375,16 +375,16 @@
| `datanode.client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. |
| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning.<br/>Metasrv will send a flush request to flush the region when:<br/>`trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`<br/>where:<br/>- `prunable_entry_id` is the maximum entry id that can be pruned of the region.<br/>- `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.<br/>Set to `0` to disable the flush operation. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster.<br/><br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)`<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_prune_interval` | String | `10m` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.<br/>**It's only used when the provider is `kafka`**. |
| `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL.<br/>Since multiple regions may share a Kafka topic, the estimated size is calculated as:<br/> (latest_entry_id - flushed_entry_id) * avg_record_size<br/>MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.<br/>- `latest_entry_id`: The latest entry ID in the topic.<br/>- `flushed_entry_id`: The last flushed entry ID for the region.<br/>Set to "0" to let the system decide the flush trigger size.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | The timeout for creating a Kafka topic.<br/>**It's only used when the provider is `kafka`**. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `logging` | -- | -- | The logging options. |

View File

@@ -176,50 +176,61 @@ tcp_nodelay = true
# - `kafka`: metasrv **have to be** configured with kafka wal config when using kafka wal provider in datanode.
provider = "raft_engine"
# Kafka wal config.
## The broker endpoints of the Kafka cluster.
##
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
## **It's only used when the provider is `kafka`**.
auto_create_topics = true
## Interval of automatically WAL pruning.
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
auto_prune_interval = "0s"
## **It's only used when the provider is `kafka`**.
auto_prune_interval = "10m"
## The threshold to trigger a flush operation of a region in automatically WAL pruning.
## Metasrv will send a flush request to flush the region when:
## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
## where:
## - `prunable_entry_id` is the maximum entry id that can be pruned of the region.
## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
## Set to `0` to disable the flush operation.
trigger_flush_threshold = 0
## Estimated size threshold to trigger a flush when using Kafka remote WAL.
## Since multiple regions may share a Kafka topic, the estimated size is calculated as:
## (latest_entry_id - flushed_entry_id) * avg_record_size
## MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.
## - `latest_entry_id`: The latest entry ID in the topic.
## - `flushed_entry_id`: The last flushed entry ID for the region.
## Set to "0" to let the system decide the flush trigger size.
## **It's only used when the provider is `kafka`**.
flush_trigger_size = "512MB"
## Concurrent task limit for automatically WAL pruning.
## **It's only used when the provider is `kafka`**.
auto_prune_parallelism = 10
## Number of topics.
## Number of topics used for remote WAL.
## **It's only used when the provider is `kafka`**.
num_topics = 64
## Topic selector type.
## Available selector types:
## - `round_robin` (default)
## **It's only used when the provider is `kafka`**.
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## Only accepts strings that match the following regular expression pattern:
## [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"
## Expected number of replicas of each partition.
## **It's only used when the provider is `kafka`**.
replication_factor = 1
## Above which a topic creation operation will be cancelled.
## The timeout for creating a Kafka topic.
## **It's only used when the provider is `kafka`**.
create_topic_timeout = "30s"
# The Kafka SASL configuration.

View File

@@ -132,6 +132,32 @@ impl LeaderRegionManifestInfo {
}
}
}
/// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
///
/// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
/// This means that within the range `[flushed_entry_id, topic_latest_entry_id]`,
/// there is no data written to the memtable.
/// Therefore, such a region can be considered inactive.
pub fn is_inactive(&self) -> bool {
match *self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id,
topic_latest_entry_id,
..
} => flushed_entry_id < topic_latest_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
data_topic_latest_entry_id,
metadata_flushed_entry_id,
metadata_topic_latest_entry_id,
..
} => {
data_flushed_entry_id < data_topic_latest_entry_id
|| metadata_flushed_entry_id < metadata_topic_latest_entry_id
}
}
}
}
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
@@ -206,3 +232,97 @@ impl LeaderRegionRegistry {
inner.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
fn mito_manifest_info(
flushed_entry_id: u64,
topic_latest_entry_id: u64,
) -> LeaderRegionManifestInfo {
LeaderRegionManifestInfo::Mito {
flushed_entry_id,
topic_latest_entry_id,
manifest_version: 1,
}
}
fn metric_manifest_info(
data_flushed_entry_id: u64,
data_topic_latest_entry_id: u64,
metadata_flushed_entry_id: u64,
metadata_topic_latest_entry_id: u64,
) -> LeaderRegionManifestInfo {
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
data_topic_latest_entry_id,
metadata_flushed_entry_id,
metadata_topic_latest_entry_id,
data_manifest_version: 1,
metadata_manifest_version: 1,
}
}
#[test]
fn test_is_inactive_mito() {
// inactive: flushed_entry_id < topic_latest_entry_id
let info = mito_manifest_info(10, 20);
assert!(info.is_inactive());
// active: flushed_entry_id == topic_latest_entry_id
let info = mito_manifest_info(20, 20);
assert!(!info.is_inactive());
// active: flushed_entry_id > topic_latest_entry_id
let info = mito_manifest_info(30, 20);
assert!(!info.is_inactive());
}
#[test]
fn test_is_inactive_metric() {
// inactive: data_flushed_entry_id < data_topic_latest_entry_id
let info = metric_manifest_info(5, 10, 20, 20);
assert!(info.is_inactive());
// inactive: metadata_flushed_entry_id < metadata_topic_latest_entry_id
let info = metric_manifest_info(10, 10, 15, 20);
assert!(info.is_inactive());
// inactive: both are less
let info = metric_manifest_info(1, 2, 3, 4);
assert!(info.is_inactive());
// active: both are equal
let info = metric_manifest_info(10, 10, 20, 20);
assert!(!info.is_inactive());
// active: both are greater
let info = metric_manifest_info(30, 20, 40, 20);
assert!(!info.is_inactive());
}
#[test]
fn test_prunable_entry_id_mito() {
let info = mito_manifest_info(100, 120);
// max(100, 120) = 120
assert_eq!(info.prunable_entry_id(), 120);
let info = mito_manifest_info(150, 120);
// max(150, 120) = 150
assert_eq!(info.prunable_entry_id(), 150);
let info = mito_manifest_info(0, 0);
assert_eq!(info.prunable_entry_id(), 0);
}
#[test]
fn test_prunable_entry_id_metric() {
let info = metric_manifest_info(100, 120, 90, 110);
// data_prunable = max(100,120)=120
// metadata_prunable = max(90,110)=110
// min(120,110)=110
assert_eq!(info.prunable_entry_id(), 110);
let info = metric_manifest_info(200, 150, 180, 220);
// data_prunable = max(200,150)=200
// metadata_prunable = max(180,220)=220
// min(200,220)=200
assert_eq!(info.prunable_entry_id(), 200);
let info = metric_manifest_info(0, 0, 0, 0);
assert_eq!(info.prunable_entry_id(), 0);
}
}

View File

@@ -291,7 +291,8 @@ impl TopicStatsStore {
// Safety: The current topic stats is initialized in the previous step.
let active_bucket = self.active_bucket.as_mut().unwrap();
debug_assert!(active_bucket.add_stat(datanode_id, stat, millis_ts));
let added = active_bucket.add_stat(datanode_id, stat, millis_ts);
debug_assert!(added);
}
/// Gets the calculated topic stat for a given topic.

View File

@@ -19,6 +19,9 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
@@ -55,9 +58,12 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
auto_prune_interval: config.auto_prune_interval,
trigger_flush_threshold: config.trigger_flush_threshold,
auto_prune_parallelism: config.auto_prune_parallelism,
// This field won't be used in standalone mode
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
// This field won't be used in standalone mode
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
// This field won't be used in standalone mode
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
}),
}
}
@@ -200,8 +206,8 @@ mod tests {
},
auto_create_topics: true,
auto_prune_interval: Duration::from_secs(0),
trigger_flush_threshold: 0,
auto_prune_parallelism: 10,
flush_trigger_size: ReadableSize::mb(512),
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

View File

@@ -16,6 +16,7 @@ use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::{Credentials, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
@@ -39,8 +40,8 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
/// Default limit for concurrent auto pruning tasks.
pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
/// Default interval for sending flush request to regions when pruning remote WAL.
pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
/// Default size of WAL to trigger flush.
pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};

View File

@@ -17,10 +17,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
};
use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -47,14 +44,6 @@ pub struct DatanodeKafkaConfig {
pub dump_index_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
// Interval of WAL pruning.
#[serde(with = "humantime_serde")]
pub auto_prune_interval: Duration,
// Threshold for sending flush request when pruning remote WAL.
// `None` stands for never sending flush request.
pub trigger_flush_threshold: u64,
// Limit of concurrent active pruning procedures.
pub auto_prune_parallelism: usize,
}
impl Default for DatanodeKafkaConfig {
@@ -69,9 +58,6 @@ impl Default for DatanodeKafkaConfig {
create_index: true,
dump_index_interval: Duration::from_secs(60),
overwrite_entry_start_id: false,
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
}
}
}

View File

@@ -14,11 +14,12 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
};
/// Kafka wal configurations for metasrv.
@@ -36,11 +37,10 @@ pub struct MetasrvKafkaConfig {
// Interval of WAL pruning.
#[serde(with = "humantime_serde")]
pub auto_prune_interval: Duration,
// Threshold for sending flush request when pruning remote WAL.
// `None` stands for never sending flush request.
pub trigger_flush_threshold: u64,
// Limit of concurrent active pruning procedures.
pub auto_prune_parallelism: usize,
// The size of WAL to trigger flush.
pub flush_trigger_size: ReadableSize,
}
impl Default for MetasrvKafkaConfig {
@@ -50,8 +50,8 @@ impl Default for MetasrvKafkaConfig {
kafka_topic: Default::default(),
auto_create_topics: true,
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
}
}
}

View File

@@ -16,7 +16,7 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -670,6 +670,7 @@ async fn open_all_regions(
));
}
let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
@@ -677,6 +678,11 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
info!(
"Opened {} regions in {:?}",
open_regions.len(),
now.elapsed()
);
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
@@ -708,6 +714,8 @@ async fn open_all_regions(
#[cfg(feature = "enterprise")]
if !follower_regions.is_empty() {
use tokio::time::Instant;
info!(
"going to open {} follower region(s)",
follower_regions.len()
@@ -728,6 +736,7 @@ async fn open_all_regions(
));
}
let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
@@ -735,6 +744,11 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
info!(
"Opened {} follower regions in {:?}",
open_regions.len(),
now.elapsed()
);
if !ignore_nonexistent_region {
ensure!(

View File

@@ -119,6 +119,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
| Some((_, Instruction::FlushRegions { .. }))
)
}

View File

@@ -15,7 +15,7 @@
use std::time::Instant;
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::{info, warn};
use common_telemetry::{debug, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -34,30 +34,22 @@ impl HandlerContext {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
let now = Instant::now();
let result = self.region_server.handle_request(*region_id, request).await;
let elapsed = now.elapsed();
info!("Flush region: {}, elapsed: {:?}", region_id, elapsed);
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!(
"Received a flush region instruction from meta, but target region: {} is not found., elapsed: {:?}",
region_id,
elapsed
"Received a flush region instruction from meta, but target region: {} is not found.",
region_id
);
}
Err(err) => {
warn!(
"Failed to flush region: {}, error: {}, elapsed: {:?}",
region_id, err, elapsed
);
warn!("Failed to flush region: {}, error: {}", region_id, err);
}
}
}
let elapsed = start_time.elapsed();
info!(
debug!(
"Flush regions: {:?}, elapsed: {:?}",
flush_regions.region_ids, elapsed
);

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use common_telemetry::warn;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient};
use rskafka::client::producer::ProduceResult;
use rskafka::record::Record;
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
@@ -112,6 +111,11 @@ impl OrderedBatchProducer {
}
}
pub struct ProduceResult {
pub offsets: Vec<i64>,
pub encoded_request_size: usize,
}
#[async_trait::async_trait]
pub trait ProducerClient: std::fmt::Debug + Send + Sync {
async fn produce(
@@ -144,7 +148,10 @@ impl ProducerClient for PartitionClient {
.with_label_values(&[self.topic(), &partition])
.inc();
Ok(result)
Ok(ProduceResult {
offsets: result.offsets,
encoded_request_size: result.encoded_request_size,
})
}
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result<i64> {

View File

@@ -927,6 +927,15 @@ pub enum Error {
offset: u64,
},
#[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
GetOffset {
topic: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
UpdateTopicNameValue {
topic: String,
@@ -981,6 +990,7 @@ impl ErrorExt for Error {
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
Error::DeleteRecords { .. }
| Error::GetOffset { .. }
| Error::PeerUnavailable { .. }
| Error::PusherNotFound { .. } => StatusCode::Unexpected,
Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,

View File

@@ -40,6 +40,7 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
pub mod utils;
pub use crate::error::Result;

View File

@@ -72,6 +72,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::flush_trigger::RegionFlushTickerRef;
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
@@ -462,6 +463,7 @@ pub struct Metasrv {
leader_region_registry: LeaderRegionRegistryRef,
topic_stats_registry: TopicStatsRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
region_flush_ticker: Option<RegionFlushTickerRef>,
table_id_sequence: SequenceRef,
reconciliation_manager: ReconciliationManagerRef,
@@ -521,6 +523,9 @@ impl Metasrv {
if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
}
if let Some(region_flush_trigger) = &self.region_flush_ticker {
leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

View File

@@ -66,6 +66,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::flush_trigger::RegionFlushTrigger;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
@@ -398,6 +399,23 @@ impl MetasrvBuilder {
};
let ddl_manager = Arc::new(ddl_manager);
let region_flush_ticker = if is_remote_wal {
let remote_wal_options = options.wal.remote_wal_options().unwrap();
let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
table_metadata_manager.clone(),
leader_region_registry.clone(),
topic_stats_registry.clone(),
mailbox.clone(),
options.grpc.server_addr.clone(),
remote_wal_options.flush_trigger_size,
);
region_flush_trigger.try_start()?;
Some(Arc::new(region_flush_ticker))
} else {
None
};
// remote WAL prune ticker and manager
let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
let (tx, rx) = WalPruneManager::channel();
@@ -410,8 +428,6 @@ impl MetasrvBuilder {
client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(),
server_addr: options.grpc.server_addr.clone(),
mailbox: mailbox.clone(),
};
let wal_prune_manager = WalPruneManager::new(
table_metadata_manager.clone(),
@@ -419,7 +435,6 @@ impl MetasrvBuilder {
rx,
procedure_manager.clone(),
wal_prune_context,
remote_wal_options.trigger_flush_threshold,
);
// Start manager in background. Ticker will be started in the main thread to send ticks.
wal_prune_manager.try_start().await?;
@@ -507,6 +522,7 @@ impl MetasrvBuilder {
cache_invalidator,
leader_region_registry,
wal_prune_ticker,
region_flush_ticker,
table_id_sequence,
reconciliation_manager,
topic_stats_registry,

View File

@@ -80,4 +80,7 @@ lazy_static! {
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
/// The triggered region flush total counter.
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
register_int_counter_vec!("meta_triggered_region_flushes_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
}

View File

@@ -192,9 +192,8 @@ pub async fn new_wal_prune_metadata(
n_region: u32,
n_table: u32,
offsets: &[i64],
threshold: u64,
topic: String,
) -> (EntryId, Vec<RegionId>) {
) -> EntryId {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
let mut min_prunable_entry_id = u64::MAX;
@@ -251,17 +250,7 @@ pub async fn new_wal_prune_metadata(
.unwrap();
}
let regions_to_flush = region_entry_ids
.iter()
.filter_map(|(region_id, prunable_entry_id)| {
if max_prunable_entry_id - prunable_entry_id > threshold {
Some(*region_id)
} else {
None
}
})
.collect::<Vec<_>>();
(min_prunable_entry_id, regions_to_flush)
min_prunable_entry_id
}
pub async fn update_in_memory_region_flushed_entry_id(

View File

@@ -16,16 +16,13 @@ pub(crate) mod manager;
#[cfg(test)]
mod test_util;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::instruction::{FlushRegions, Instruction};
use common_meta::key::TableMetadataManagerRef;
use common_meta::lock_key::RemoteWalLock;
use common_meta::peer::Peer;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
@@ -36,7 +33,7 @@ use common_telemetry::{info, warn};
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::partition::{OffsetAt, UnknownTopicHandling};
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -47,7 +44,6 @@ use crate::error::{
self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
UpdateTopicNameValueSnafu,
};
use crate::service::mailbox::{Channel, MailboxRef};
use crate::Result;
pub type KafkaClientRef = Arc<Client>;
@@ -58,7 +54,6 @@ const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
FlushRegion,
Prune,
}
@@ -70,10 +65,6 @@ pub struct Context {
pub table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
/// Server address of metasrv.
pub server_addr: String,
/// The mailbox to send messages.
pub mailbox: MailboxRef,
}
/// The data of WAL pruning.
@@ -83,10 +74,6 @@ pub struct WalPruneData {
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
pub prunable_entry_id: EntryId,
pub regions_to_flush: Vec<RegionId>,
/// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region.
/// If `None`, never send flush requests.
pub trigger_flush_threshold: u64,
/// The state.
pub state: WalPruneState,
}
@@ -101,18 +88,11 @@ pub struct WalPruneProcedure {
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(
topic: String,
context: Context,
trigger_flush_threshold: u64,
guard: Option<WalPruneProcedureGuard>,
) -> Self {
pub fn new(topic: String, context: Context, guard: Option<WalPruneProcedureGuard>) -> Self {
Self {
data: WalPruneData {
topic,
prunable_entry_id: 0,
trigger_flush_threshold,
regions_to_flush: vec![],
state: WalPruneState::Prepare,
},
context,
@@ -134,60 +114,6 @@ impl WalPruneProcedure {
})
}
async fn build_peer_to_region_ids_map(
&self,
ctx: &Context,
region_ids: &[RegionId],
) -> Result<HashMap<Peer, Vec<RegionId>>> {
let table_ids = region_ids
.iter()
.map(|region_id| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_ids_table_routes_map = ctx
.table_metadata_manager
.table_route_manager()
.batch_get_physical_table_routes(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;
let mut peer_region_ids_map = HashMap::new();
for region_id in region_ids {
let table_id = region_id.table_id();
let table_route = match table_ids_table_routes_map.get(&table_id) {
Some(route) => route,
None => return error::TableRouteNotFoundSnafu { table_id }.fail(),
};
for region_route in &table_route.region_routes {
if region_route.region.id != *region_id {
continue;
}
if let Some(peer) = &region_route.leader_peer {
peer_region_ids_map
.entry(peer.clone())
.or_insert_with(Vec::new)
.push(*region_id);
}
}
}
Ok(peer_region_ids_map)
}
fn build_flush_region_instruction(
&self,
peer_region_ids_map: HashMap<Peer, Vec<RegionId>>,
) -> Result<Vec<(Peer, Instruction)>> {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
Ok(peer_and_instructions)
}
/// Prepare the entry id to prune and regions to flush.
///
/// Retry:
@@ -211,10 +137,6 @@ impl WalPruneProcedure {
.into_iter()
.map(|(region_id, region)| {
let prunable_entry_id = region.manifest.prunable_entry_id();
info!(
"Region {}, topic: {}, prunable_entry_id: {}",
region_id, self.data.topic, prunable_entry_id
);
(region_id, prunable_entry_id)
})
.collect();
@@ -226,73 +148,24 @@ impl WalPruneProcedure {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
return Ok(Status::done());
}
let min_max_result = prunable_entry_ids_map.values().minmax();
let max_prunable_entry_id = match min_max_result {
match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
MinMaxResult::OneElement(prunable_entry_id) => {
self.data.prunable_entry_id = *prunable_entry_id;
*prunable_entry_id
}
MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
MinMaxResult::MinMax(min_prunable_entry_id, _) => {
self.data.prunable_entry_id = *min_prunable_entry_id;
*max_prunable_entry_id
}
};
if self.data.trigger_flush_threshold != 0 {
for (region_id, prunable_entry_id) in prunable_entry_ids_map {
if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
self.data.regions_to_flush.push(region_id);
}
}
info!(
"Flush regions: {:?}, trigger_flush_threshold: {}, prunable_entry_id: {}, max_prunable_entry_id: {}",
self.data.regions_to_flush,
self.data.trigger_flush_threshold,
self.data.prunable_entry_id,
max_prunable_entry_id
);
self.data.state = WalPruneState::FlushRegion;
} else {
self.data.state = WalPruneState::Prune;
}
Ok(Status::executing(true))
}
/// Send the flush request to regions with low flush entry id.
///
/// Retry:
/// - Failed to build peer to region ids map. It means failure in retrieving metadata.
pub async fn on_sending_flush_request(&mut self) -> Result<Status> {
let peer_to_region_ids_map = self
.build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to build peer to region ids map",
})?;
let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
for (peer, flush_instruction) in flush_instructions.into_iter() {
let msg = MailboxMessage::json_message(
&format!("Flush regions: {}", flush_instruction),
&format!("Metasrv@{}", self.context.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(peer.id);
self.context.mailbox.send_oneway(&ch, msg).await?;
}
self.data.state = WalPruneState::Prune;
Ok(Status::executing(true))
Ok(Status::executing(false))
}
/// Prune the WAL and persist the minimum prunable entry id.
@@ -301,7 +174,6 @@ impl WalPruneProcedure {
/// - Failed to update the minimum prunable entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result<Status> {
// Safety: `prunable_entry_id`` are loaded in on_prepare.
let partition_client = self
.context
.client
@@ -315,6 +187,25 @@ impl WalPruneProcedure {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
})?;
let earliest_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.context(error::GetOffsetSnafu {
topic: self.data.topic.clone(),
})?;
let latest_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(error::GetOffsetSnafu {
topic: self.data.topic.clone(),
})?;
if self.data.prunable_entry_id <= earliest_offset as u64 {
warn!(
"The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}",
self.data.topic, self.data.prunable_entry_id, earliest_offset as u64
);
return Ok(Status::done());
}
// Should update the min prunable entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
@@ -359,13 +250,14 @@ impl WalPruneProcedure {
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to delete records for topic: {}, partition: {}, offset: {}",
self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id
"Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
self.data.topic, self.data.prunable_entry_id, latest_offset as u64
),
})?;
info!(
"Successfully pruned WAL for topic: {}, entry id: {}",
self.data.topic, self.data.prunable_entry_id
"Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
self.data.topic, self.data.prunable_entry_id, latest_offset as u64
);
Ok(Status::done())
}
@@ -386,7 +278,6 @@ impl Procedure for WalPruneProcedure {
match state {
WalPruneState::Prepare => self.on_prepare().await,
WalPruneState::FlushRegion => self.on_sending_flush_request().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
@@ -430,14 +321,11 @@ fn check_heartbeat_collected_region_ids(
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatResponse;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use rskafka::record::Record;
use tokio::sync::mpsc::Receiver;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::test_util::new_wal_prune_metadata;
// Fix this import to correctly point to the test_util module
use crate::procedure::wal_prune::test_util::TestEnv;
@@ -446,7 +334,7 @@ mod tests {
/// Including:
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
/// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec<RegionId>) {
async fn mock_test_data(procedure: &WalPruneProcedure) -> u64 {
let n_region = 10;
let n_table = 5;
// 5 entries per region.
@@ -456,17 +344,16 @@ mod tests {
(n_region * n_table * 5) as usize,
)
.await;
let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
let prunable_entry_id = new_wal_prune_metadata(
procedure.context.table_metadata_manager.clone(),
procedure.context.leader_region_registry.clone(),
n_region,
n_table,
&offsets,
procedure.data.trigger_flush_threshold,
procedure.data.topic.clone(),
)
.await;
(prunable_entry_id, regions_to_flush)
prunable_entry_id
}
fn record(i: usize) -> Record {
@@ -541,26 +428,6 @@ mod tests {
.unwrap();
}
async fn check_flush_request(
rx: &mut Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>,
region_ids: &[RegionId],
) {
let resp = rx.recv().await.unwrap().unwrap();
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids
.iter()
.cloned()
.sorted_by_key(|a| a.as_u64())
.collect::<Vec<_>>();
flush_requested_region_ids.sort_by_key(|a| a.as_u64());
assert_eq!(flush_requested_region_ids, sorted_region_ids);
}
#[tokio::test]
async fn test_procedure_execution() {
maybe_skip_kafka_integration_test!();
@@ -570,53 +437,26 @@ mod tests {
let mut topic_name = uuid::Uuid::new_v4().to_string();
// Topic should start with a letter.
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, None);
// Before any data in kvbackend is mocked, should return a retryable error.
let result = procedure.on_prune().await;
assert_matches!(result, Err(e) if e.is_retryable());
let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
let prunable_entry_id = mock_test_data(&procedure).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
);
for region_id in &regions_to_flush {
assert!(procedure.data.regions_to_flush.contains(region_id));
}
// Step 2: Test `on_sending_flush_request`.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
env.mailbox
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let status = procedure.on_sending_flush_request().await.unwrap();
check_flush_request(&mut rx, &regions_to_flush).await;
assert_matches!(
status,
Status::Executing {
persist: true,
persist: false,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
// Step 3: Test `on_prune`.
// Step 2: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after(include) `prunable_entry_id` still exist.
@@ -645,18 +485,12 @@ mod tests {
.unwrap();
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Step 3: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
procedure.data.trigger_flush_threshold = 0;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
}

View File

@@ -15,20 +15,17 @@
use std::collections::hash_set::Entry;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::sync::{Arc, RwLock};
use common_meta::key::TableMetadataManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
use tokio::time::{interval_at, Instant, MissedTickBehavior};
use crate::define_ticker;
use crate::error::{self, Result};
use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
@@ -97,81 +94,13 @@ impl Debug for Event {
}
}
/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
pub(crate) struct WalPruneTicker {
/// Handle of ticker thread.
pub(crate) tick_handle: Mutex<Option<JoinHandle<()>>>,
/// The interval of tick.
pub(crate) tick_interval: Duration,
/// Sends [Event]s.
pub(crate) sender: Sender<Event>,
}
#[async_trait::async_trait]
impl LeadershipChangeListener for WalPruneTicker {
fn name(&self) -> &'static str {
"WalPruneTicker"
}
async fn on_leader_start(&self) -> common_meta::error::Result<()> {
self.start();
Ok(())
}
async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
self.stop();
Ok(())
}
}
/// TODO(CookiePie): Similar to [RegionSupervisorTicker], maybe can refactor to a unified framework.
impl WalPruneTicker {
pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
Self {
tick_handle: Mutex::new(None),
tick_interval,
sender,
}
}
/// Starts the ticker.
pub fn start(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if handle.is_none() {
let sender = self.sender.clone();
let tick_interval = self.tick_interval;
let ticker_loop = tokio::spawn(async move {
let mut interval = interval_at(Instant::now() + tick_interval, tick_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if sender.send(Event::Tick).await.is_err() {
info!("EventReceiver is dropped, tick loop is stopped");
break;
}
}
});
*handle = Some(ticker_loop);
}
info!("WalPruneTicker started.");
}
/// Stops the ticker.
pub fn stop(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if let Some(handle) = handle.take() {
handle.abort();
}
info!("WalPruneTicker stopped.");
}
}
impl Drop for WalPruneTicker {
fn drop(&mut self) {
self.stop();
}
}
define_ticker!(
/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
WalPruneTicker,
event_type = Event,
event_value = Event::Tick
);
/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
///
@@ -193,9 +122,6 @@ pub(crate) struct WalPruneManager {
/// Context for [WalPruneProcedure].
wal_prune_context: WalPruneContext,
/// Trigger flush threshold for [WalPruneProcedure].
/// If `None`, never send flush requests.
trigger_flush_threshold: u64,
}
impl WalPruneManager {
@@ -206,7 +132,6 @@ impl WalPruneManager {
receiver: Receiver<Event>,
procedure_manager: ProcedureManagerRef,
wal_prune_context: WalPruneContext,
trigger_flush_threshold: u64,
) -> Self {
Self {
table_metadata_manager,
@@ -217,7 +142,6 @@ impl WalPruneManager {
running_procedures: Arc::new(RwLock::new(HashSet::new())),
},
semaphore: Arc::new(Semaphore::new(limit)),
trigger_flush_threshold,
}
}
@@ -271,7 +195,6 @@ impl WalPruneManager {
let procedure = WalPruneProcedure::new(
topic_name.to_string(),
self.wal_prune_context.clone(),
self.trigger_flush_threshold,
Some(guard),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -332,8 +255,10 @@ impl WalPruneManager {
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::key::topic_name::TopicNameKey;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use tokio::time::{sleep, timeout};
@@ -343,6 +268,7 @@ mod test {
#[tokio::test]
async fn test_wal_prune_ticker() {
common_telemetry::init_default_ut_logging();
let (tx, mut rx) = WalPruneManager::channel();
let interval = Duration::from_millis(50);
let ticker = WalPruneTicker::new(interval, tx);
@@ -394,7 +320,6 @@ mod test {
rx,
test_env.procedure_manager.clone(),
wal_prune_context,
0,
),
)
}

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::build_kafka_client;
use common_procedure::local::{LocalManager, ManagerConfig};
@@ -27,15 +26,12 @@ use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::Client;
use crate::procedure::test_util::MailboxContext;
use crate::procedure::wal_prune::Context as WalPruneContext;
pub struct TestEnv {
pub table_metadata_manager: TableMetadataManagerRef,
pub leader_region_registry: LeaderRegionRegistryRef,
pub procedure_manager: ProcedureManagerRef,
pub mailbox: MailboxContext,
pub server_addr: String,
}
impl TestEnv {
@@ -43,8 +39,6 @@ impl TestEnv {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
@@ -56,14 +50,10 @@ impl TestEnv {
None,
));
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
leader_region_registry,
procedure_manager,
mailbox: mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
@@ -89,8 +79,6 @@ impl TestEnv {
client,
table_metadata_manager: self.table_metadata_manager.clone(),
leader_region_registry: self.leader_region_registry.clone(),
server_addr: self.server_addr.to_string(),
mailbox: self.mailbox.mailbox().clone(),
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod failure_detector;
pub mod flush_trigger;
pub mod lease_keeper;
pub mod supervisor;

View File

@@ -0,0 +1,525 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_base::readable_size::ReadableSize;
use common_meta::instruction::{FlushRegions, Instruction};
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::stats::topic::TopicStatsRegistryRef;
use common_telemetry::{debug, error, info};
use common_time::util::current_time_millis;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::error::{self, Result};
use crate::service::mailbox::{Channel, MailboxRef};
use crate::{define_ticker, metrics};
/// The interval of the region flush ticker.
const TICKER_INTERVAL: Duration = Duration::from_secs(60);
/// The duration of the recent period.
const RECENT_DURATION: Duration = Duration::from_secs(300);
/// [`Event`] represents various types of events that can be processed by the region flush ticker.
///
/// Variants:
/// - `Tick`: This event is used to trigger region flush trigger periodically.
pub(crate) enum Event {
Tick,
}
pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
define_ticker!(
/// [RegionFlushTicker] is used to trigger region flush trigger periodically.
RegionFlushTicker,
event_type = Event,
event_value = Event::Tick
);
/// [`RegionFlushTrigger`] is used to ensure that the estimated WAL replay size
/// stays below a certain threshold by triggering a region flush when the estimated
/// WAL replay size exceeds that threshold. This helps improve datanode startup
/// speed and reduce the overall startup time.
///
/// The estimated WAL replay size is calculated as:
/// `(latest_entry_id - flushed_entry_id) * avg_record_size`
pub struct RegionFlushTrigger {
/// The metadata manager.
table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
leader_region_registry: LeaderRegionRegistryRef,
/// The topic stats registry.
topic_stats_registry: TopicStatsRegistryRef,
/// The mailbox to send messages.
mailbox: MailboxRef,
/// The server address.
server_addr: String,
/// The flush trigger size.
flush_trigger_size: ReadableSize,
/// The receiver of events.
receiver: Receiver<Event>,
}
impl RegionFlushTrigger {
/// Creates a new [`RegionFlushTrigger`].
pub(crate) fn new(
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
topic_stats_registry: TopicStatsRegistryRef,
mailbox: MailboxRef,
server_addr: String,
flush_trigger_size: ReadableSize,
) -> (Self, RegionFlushTicker) {
let (tx, rx) = Self::channel();
let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
let region_flush_trigger = Self {
table_metadata_manager,
leader_region_registry,
topic_stats_registry,
mailbox,
server_addr,
flush_trigger_size,
receiver: rx,
};
(region_flush_trigger, region_flush_ticker)
}
fn channel() -> (Sender<Event>, Receiver<Event>) {
tokio::sync::mpsc::channel(8)
}
/// Starts the region flush trigger.
pub fn try_start(mut self) -> Result<()> {
common_runtime::spawn_global(async move { self.run().await });
info!("Region flush trigger started");
Ok(())
}
async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Tick => self.handle_tick().await,
}
}
}
async fn handle_tick(&self) {
if let Err(e) = self.trigger_flush().await {
error!(e; "Failed to trigger flush");
}
}
async fn trigger_flush(&self) -> Result<()> {
let now = Instant::now();
let topics = self
.table_metadata_manager
.topic_name_manager()
.range()
.await
.context(error::TableMetadataManagerSnafu)?;
for topic in &topics {
let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
continue;
};
if let Err(e) = self
.flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
.await
{
error!(e; "Failed to flush regions in topic: {}", topic);
}
}
debug!(
"Triggered flush for {} topics in {:?}",
topics.len(),
now.elapsed()
);
Ok(())
}
/// Retrieves the latest entry id and average record size of a topic.
///
/// Returns `None` if the topic is not found or the latest entry id is not recent.
fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
let Some((latest_entry_id, timestamp)) =
self.topic_stats_registry.get_latest_entry_id(topic)
else {
debug!("No latest entry id found for topic: {}", topic);
return None;
};
let Some(stat) = self
.topic_stats_registry
.get_calculated_topic_stat(topic, TICKER_INTERVAL)
else {
debug!("No topic stat found for topic: {}", topic);
return None;
};
let now = current_time_millis();
if !is_recent(timestamp, now, RECENT_DURATION) {
debug!(
"Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
topic, timestamp, now
);
return None;
}
if !is_recent(stat.end_ts, now, RECENT_DURATION) {
debug!(
"Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
topic, stat.end_ts, now
);
return None;
}
Some((latest_entry_id, stat.avg_record_size))
}
async fn flush_regions_in_topic(
&self,
topic: &str,
latest_entry_id: u64,
avg_record_size: usize,
) -> Result<()> {
let region_ids = self
.table_metadata_manager
.topic_region_manager()
.regions(topic)
.await
.context(error::TableMetadataManagerSnafu)?;
if region_ids.is_empty() {
debug!("No regions found for topic: {}", topic);
return Ok(());
}
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = self
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.partition_map(|(region_id, region)| {
if !region.manifest.is_inactive() {
itertools::Either::Left((region_id, region.manifest.prunable_entry_id()))
} else {
itertools::Either::Right((region_id, region.manifest.prunable_entry_id()))
}
});
// Selects regions to flush from the set of active regions.
let mut regions_to_flush = select_regions_to_flush(
topic,
active_regions.into_iter(),
avg_record_size as u64,
latest_entry_id,
self.flush_trigger_size,
);
let active_regions_num = regions_to_flush.len();
// Selects regions to flush from the set of inactive regions.
// For inactive regions, we use a lower flush trigger size (half of the normal size)
// to encourage more aggressive flushing to update the region's topic latest entry id.
let inactive_regions_to_flush = select_regions_to_flush(
topic,
inactive_regions.into_iter(),
avg_record_size as u64,
latest_entry_id,
self.flush_trigger_size / 2,
);
let inactive_regions_num = inactive_regions_to_flush.len();
regions_to_flush.extend(inactive_regions_to_flush);
// Sends flush instructions to datanodes.
if !regions_to_flush.is_empty() {
self.send_flush_instructions(&regions_to_flush).await?;
debug!(
"Sent {} flush instructions to datanodes for topic: '{}' ({} inactive regions)",
regions_to_flush.len(),
topic,
inactive_regions_num,
);
}
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "active"])
.inc_by(active_regions_num as u64);
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "inactive"])
.inc_by(inactive_regions_num as u64);
Ok(())
}
async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
let leader_to_region_ids =
group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
let flush_instructions = leader_to_region_ids
.into_iter()
.map(|(leader, region_ids)| {
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(leader, flush_instruction)
});
for (peer, flush_instruction) in flush_instructions {
let msg = MailboxMessage::json_message(
&format!("Flush regions: {}", flush_instruction),
&format!("Metasrv@{}", self.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
if let Err(e) = self
.mailbox
.send_oneway(&Channel::Datanode(peer.id), msg)
.await
{
error!(e; "Failed to send flush instruction to datanode {}", peer);
}
}
Ok(())
}
}
/// Select regions to flush based on the estimated replay size.
///
/// The regions are selected if the estimated replay size exceeds the flush trigger size.
/// The estimated replay size is calculated as:
/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
fn select_regions_to_flush<I: Iterator<Item = (RegionId, u64)>>(
topic: &str,
regions: I,
avg_record_size: u64,
latest_entry_id: u64,
flush_trigger_size: ReadableSize,
) -> Vec<RegionId> {
let mut regions_to_flush = Vec::new();
for (region_id, prunable_entry_id) in regions {
if prunable_entry_id < latest_entry_id {
let replay_size = (latest_entry_id - prunable_entry_id).saturating_mul(avg_record_size);
if replay_size > flush_trigger_size.as_bytes() {
debug!(
"Region {}: estimated replay size {} exceeds flush trigger size {}, prunable entry id: {}, topic latest entry id: {}, topic: '{}'",
region_id, ReadableSize(replay_size), flush_trigger_size, prunable_entry_id, latest_entry_id, topic
);
regions_to_flush.push(region_id);
}
}
}
regions_to_flush
}
/// Group regions by leader.
///
/// The regions are grouped by the leader of the region.
async fn group_regions_by_leader(
table_metadata_manager: &TableMetadataManagerRef,
regions_to_flush: &[RegionId],
) -> Result<HashMap<Peer, Vec<RegionId>>> {
let table_ids = regions_to_flush
.iter()
.map(|region_id| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_ids_table_routes = table_metadata_manager
.table_route_manager()
.batch_get_physical_table_routes(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)?;
let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in regions_to_flush {
let table_id = region_id.table_id();
let table_route = table_ids_table_routes
.get(&table_id)
.context(error::TableRouteNotFoundSnafu { table_id })?;
let Some(region_route) = table_route
.region_routes
.iter()
.find(|r| r.region.id == *region_id)
else {
continue;
};
let Some(peer) = &region_route.leader_peer else {
continue;
};
match peer_region_ids_map.get_mut(peer) {
Some(region_ids) => {
region_ids.push(*region_id);
}
None => {
peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
}
}
}
Ok(peer_region_ids_map)
}
/// Check if the timestamp is recent.
///
/// The timestamp is recent if the difference between the current time and the timestamp is less than the duration.
fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
let duration = duration.as_millis() as i64;
now.saturating_sub(timestamp) < duration
}
#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
use store_api::storage::RegionId;
use super::*;
#[test]
fn test_is_recent() {
let now = current_time_millis();
assert!(is_recent(now - 999, now, Duration::from_secs(1)));
assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
}
fn region_id(table: u32, region: u32) -> RegionId {
RegionId::new(table, region)
}
#[test]
fn test_no_regions_to_flush_when_none_exceed_threshold() {
let topic = "test_topic";
let avg_record_size = 10;
let latest_entry_id = 100;
let flush_trigger_size = ReadableSize(1000); // 1000 bytes
// All regions have prunable_entry_id close to latest_entry_id, so replay_size is small
let regions = vec![
(region_id(1, 1), 99), // replay_size = (100-99)*10 = 10
(region_id(1, 2), 98), // replay_size = 20
(region_id(1, 3), 95), // replay_size = 50
];
let result = select_regions_to_flush(
topic,
regions.into_iter(),
avg_record_size,
latest_entry_id,
flush_trigger_size,
);
assert!(result.is_empty());
}
#[test]
fn test_regions_to_flush_when_some_exceed_threshold() {
let topic = "test_topic";
let avg_record_size = 10;
let latest_entry_id = 100;
let flush_trigger_size = ReadableSize(50); // 50 bytes
// Only region 1,3 will exceed threshold: (100-90)*10 = 100 > 50
let regions = vec![
(region_id(1, 1), 99), // replay_size = 10
(region_id(1, 2), 98), // replay_size = 20
(region_id(1, 3), 90), // replay_size = 100
];
let result = select_regions_to_flush(
topic,
regions.into_iter(),
avg_record_size,
latest_entry_id,
flush_trigger_size,
);
assert_eq!(result, vec![region_id(1, 3)]);
}
#[test]
fn test_regions_to_flush_with_zero_avg_record_size() {
let topic = "test_topic";
let avg_record_size = 0;
let latest_entry_id = 100;
let flush_trigger_size = ReadableSize(1);
let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
// replay_size will always be 0, so none should be flushed
let result = select_regions_to_flush(
topic,
regions.into_iter(),
avg_record_size,
latest_entry_id,
flush_trigger_size,
);
assert!(result.is_empty());
}
#[test]
fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
let topic = "test_topic";
let avg_record_size = 10;
let latest_entry_id = 100;
let flush_trigger_size = ReadableSize(10);
let regions = vec![
(region_id(1, 1), 100), // prunable_entry_id == latest_entry_id, should not be flushed
(region_id(1, 2), 99), // replay_size = 10
];
let result = select_regions_to_flush(
topic,
regions.into_iter(),
avg_record_size,
latest_entry_id,
flush_trigger_size,
);
// Only region 1,2 should be flushed if replay_size > 10
assert!(result.is_empty());
}
#[test]
fn test_multiple_regions_to_flush() {
let topic = "test_topic";
let avg_record_size = 5;
let latest_entry_id = 200;
let flush_trigger_size = ReadableSize(20);
let regions = vec![
(region_id(1, 1), 190), // replay_size = 50
(region_id(1, 2), 180), // replay_size = 100
(region_id(1, 3), 199), // replay_size = 5
(region_id(1, 4), 200), // replay_size = 0
];
let result = select_regions_to_flush(
topic,
regions.into_iter(),
avg_record_size,
latest_entry_id,
flush_trigger_size,
);
// Only regions 1,1 and 1,2 should be flushed
assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
}
}

98
src/meta-srv/src/utils.rs Normal file
View File

@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_export]
macro_rules! define_ticker {
(
$(#[$meta:meta])*
$name:ident,
event_type = $event_ty:ty,
event_value = $event_val:expr
) => {
$(#[$meta])*
pub(crate) struct $name {
pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
pub(crate) tick_interval: std::time::Duration,
pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
}
#[async_trait::async_trait]
impl common_meta::leadership_notifier::LeadershipChangeListener for $name {
fn name(&self) -> &'static str {
stringify!($name)
}
async fn on_leader_start(&self) -> common_meta::error::Result<()> {
self.start();
Ok(())
}
async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
self.stop();
Ok(())
}
}
impl $name {
pub(crate) fn new(
tick_interval: std::time::Duration,
sender: tokio::sync::mpsc::Sender<$event_ty>,
) -> Self {
Self {
tick_handle: std::sync::Mutex::new(None),
tick_interval,
sender,
}
}
pub fn start(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if handle.is_none() {
let sender = self.sender.clone();
let tick_interval = self.tick_interval;
let ticker_loop = tokio::spawn(async move {
let mut interval = tokio::time::interval_at(
tokio::time::Instant::now() + tick_interval,
tick_interval,
);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if sender.send($event_val).await.is_err() {
common_telemetry::info!("EventReceiver is dropped, tick loop is stopped");
break;
}
}
});
*handle = Some(ticker_loop);
}
common_telemetry::info!("{} started.", stringify!($name));
}
pub fn stop(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if let Some(handle) = handle.take() {
handle.abort();
}
common_telemetry::info!("{} stopped.", stringify!($name));
}
}
impl Drop for $name {
fn drop(&mut self) {
self.stop();
}
}
};
}

View File

@@ -483,13 +483,14 @@ impl EngineInner {
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let now = Instant::now();
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
provider.clone(),
self.wal_raw_entry_reader.clone(),
&region_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
@@ -510,8 +511,17 @@ impl EngineInner {
common_runtime::spawn_global(async move { distributor.distribute().await });
// Waits for worker returns.
let responses = join_all(responses).await;
distribution.await.context(JoinSnafu)??;
let num_failure = responses.iter().filter(|r| r.is_err()).count();
info!(
"Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
region_ids.len() - num_failure,
// Safety: provider is kafka provider.
provider.as_kafka_provider().unwrap(),
num_failure,
now.elapsed(),
);
Ok(region_ids.into_iter().zip(responses).collect())
}

View File

@@ -18,6 +18,7 @@ use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::{debug, error, info, warn};
use common_wal::options::WalOptions;
@@ -625,6 +626,7 @@ pub(crate) async fn replay_memtable<F>(
where
F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
{
let now = Instant::now();
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
@@ -687,8 +689,8 @@ where
let series_count = version_control.current().series_count();
info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}",
region_id, rows_replayed, last_entry_id, series_count
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
region_id, rows_replayed, last_entry_id, series_count, now.elapsed()
);
Ok(last_entry_id)
}