feat: introduce PeriodicTopicStatsReporter (#6730)

* refactor: introduce `PeriodicTopicStatsReporter`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix typo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remote wal tests styling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: handling region wal options not found

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: minor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade greptime-proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-13 19:46:50 +08:00
committed by GitHub
parent dea87b7e57
commit 8659412cac
32 changed files with 707 additions and 469 deletions

4
Cargo.lock generated
View File

@@ -5324,7 +5324,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ccfd4da48bc0254ed865e479cd981a3581b02d84#ccfd4da48bc0254ed865e479cd981a3581b02d84"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69680846a078aae670d93fb30511a72738345199#69680846a078aae670d93fb30511a72738345199"
dependencies = [
"prost 0.13.5",
"serde",
@@ -10869,7 +10869,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
source = "git+https://github.com/WenyXu/rskafka.git?rev=bc582e98918def613a882581a1b9331d186d9b2d#bc582e98918def613a882581a1b9331d186d9b2d"
dependencies = [
"bytes",
"chrono",

View File

@@ -140,7 +140,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ccfd4da48bc0254ed865e479cd981a3581b02d84" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69680846a078aae670d93fb30511a72738345199" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -188,7 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "bc582e98918def613a882581a1b9331d186d9b2d", features = [
"transport-tls",
] }
rstest = "0.25"

View File

@@ -108,6 +108,24 @@ pub struct RegionStat {
pub metadata_topic_latest_entry_id: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicStat {
/// The topic name.
pub topic: String,
/// The latest entry id of the topic.
pub latest_entry_id: u64,
/// The total size in bytes of records appended to the topic.
pub record_size: u64,
/// The total number of records appended to the topic.
pub record_num: u64,
}
/// Trait for reporting statistics about topics.
pub trait TopicStatsReporter: Send + Sync {
/// Returns a list of topic statistics that can be reported.
fn reportable_topics(&mut self) -> Vec<TopicStat>;
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RegionManifestInfo {
Mito {

View File

@@ -43,3 +43,9 @@ pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;
/// The interval of reporting topic stats.
pub const TOPIC_STATS_REPORT_INTERVAL_SECS: u64 = 15;
/// The retention seconds of topic stats.
pub const TOPIC_STATS_RETENTION_SECS: u64 = TOPIC_STATS_REPORT_INTERVAL_SECS * 100;

View File

@@ -251,11 +251,11 @@ pub async fn test_kafka_topic_pool(
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_postgres_integration_test {
() => {
@@ -267,11 +267,11 @@ macro_rules! maybe_skip_postgres_integration_test {
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_mysql_integration_test {
() => {

View File

@@ -12,26 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use futures_util::future::BoxFuture;
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let Ok(endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test");
return;
};
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
}
/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`.
///
/// The format of the environment variable is:

View File

@@ -22,6 +22,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::datanode::TopicStatsReporter;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
@@ -162,6 +163,7 @@ pub struct DatanodeBuilder {
meta_client: Option<MetaClientRef>,
kv_backend: KvBackendRef,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
}
@@ -177,6 +179,7 @@ impl DatanodeBuilder {
cache_registry: None,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
topic_stats_reporter: None,
}
}
@@ -414,6 +417,9 @@ impl DatanodeBuilder {
for engine in engines {
region_server.register_engine(engine);
}
if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
region_server.set_topic_stats_reporter(topic_stats_reporter);
}
Ok(region_server)
}
@@ -527,10 +533,13 @@ impl DatanodeBuilder {
None
};
let log_store =
Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
let builder = MitoEngineBuilder::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,

View File

@@ -279,10 +279,12 @@ impl HeartbeatTask {
}
_ = &mut sleep => {
let region_stats = Self::load_region_stats(&region_server_clone);
let topic_stats = region_server_clone.topic_stats();
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
region_stats,
topic_stats,
duration_since_epoch,
..heartbeat_request.clone()
};

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::warn;
use common_telemetry::{info, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -27,26 +29,38 @@ impl HandlerContext {
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
for region_id in flush_regions.region_ids {
let start_time = Instant::now();
for region_id in &flush_regions.region_ids {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
let result = self.region_server.handle_request(region_id, request).await;
let now = Instant::now();
let result = self.region_server.handle_request(*region_id, request).await;
let elapsed = now.elapsed();
info!("Flush region: {}, elapsed: {:?}", region_id, elapsed);
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a flush region instruction from meta, but target region: {region_id} is not found.");
warn!(
"Received a flush region instruction from meta, but target region: {} is not found., elapsed: {:?}",
region_id,
elapsed
);
}
Err(err) => {
warn!(
"Failed to flush region: {region_id}, error: {err}",
region_id = region_id,
err = err,
"Failed to flush region: {}, error: {}, elapsed: {:?}",
region_id, err, elapsed
);
}
}
}
let elapsed = start_time.elapsed();
info!(
"Flush regions: {:?}, elapsed: {:?}",
flush_regions.region_ids, elapsed
);
None
})
}

View File

@@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
@@ -29,6 +30,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_meta::datanode::TopicStatsReporter;
use common_query::request::QueryRequest;
use common_query::OutputData;
use common_recordbatch::SendableRecordBatchStream;
@@ -135,10 +137,16 @@ impl RegionServer {
}
}
/// Registers an engine.
pub fn register_engine(&mut self, engine: RegionEngineRef) {
self.inner.register_engine(engine);
}
/// Sets the topic stats.
pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
self.inner.set_topic_stats_reporter(topic_stats_reporter);
}
/// Finds the region's engine by its id. If the region is not ready, returns `None`.
pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
match self.inner.get_engine(region_id, &RegionChange::None) {
@@ -305,6 +313,24 @@ impl RegionServer {
.collect()
}
/// Returns the reportable topics.
pub fn topic_stats(&self) -> Vec<TopicStat> {
let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
let Some(reporter) = reporter.as_mut() else {
return vec![];
};
reporter
.reportable_topics()
.into_iter()
.map(|stat| TopicStat {
topic_name: stat.topic,
record_size: stat.record_size,
record_num: stat.record_num,
latest_entry_id: stat.latest_entry_id,
})
.collect()
}
pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
self.inner.region_map.get(&region_id).and_then(|engine| {
engine.role(region_id).map(|role| match role {
@@ -669,6 +695,8 @@ struct RegionServerInner {
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
// The topic stats reporter.
topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
}
struct RegionServerParallelism {
@@ -734,6 +762,7 @@ impl RegionServerInner {
event_listener,
table_provider_factory,
parallelism,
topic_stats_reporter: RwLock::new(None),
}
}
@@ -746,6 +775,11 @@ impl RegionServerInner {
.insert(engine_name.to_string(), engine);
}
pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
info!("Set topic stats reporter");
*self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
}
fn get_engine(
&self,
region_id: RegionId,

View File

@@ -14,9 +14,9 @@
pub(crate) mod client_manager;
pub(crate) mod consumer;
mod high_watermark_manager;
pub(crate) mod index;
pub mod log_store;
mod periodic_offset_fetcher;
pub(crate) mod producer;
#[cfg(test)]
pub(crate) mod test_util;

View File

@@ -28,6 +28,7 @@ use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
use crate::kafka::log_store::TopicStat;
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
// Each topic only has one partition for now.
@@ -66,8 +67,8 @@ pub(crate) struct ClientManager {
flush_batch_size: usize,
compression: Compression,
/// High watermark for each topic.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
/// The stats of each topic.
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
}
impl ClientManager {
@@ -75,7 +76,7 @@ impl ClientManager {
pub(crate) async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
@@ -101,7 +102,7 @@ impl ClientManager {
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: Compression::Lz4,
global_index_collector,
high_watermark,
topic_stats,
})
}
@@ -117,7 +118,8 @@ impl ClientManager {
.write()
.await
.insert(provider.clone(), client.clone());
self.high_watermark.insert(provider.clone(), 0);
self.topic_stats
.insert(provider.clone(), TopicStat::default());
Ok(client)
}
}
@@ -166,7 +168,7 @@ impl ClientManager {
self.compression,
self.flush_batch_size,
index_collector,
self.high_watermark.clone(),
self.topic_stats.clone(),
));
Ok(Client { client, producer })
@@ -176,9 +178,14 @@ impl ClientManager {
self.global_index_collector.as_ref()
}
/// Lists all topics.
pub(crate) async fn list_topics(&self) -> Vec<Arc<KafkaProvider>> {
self.instances.read().await.keys().cloned().collect()
}
#[cfg(test)]
pub(crate) fn high_watermark(&self) -> &Arc<DashMap<Arc<KafkaProvider>, u64>> {
&self.high_watermark
pub(crate) fn topic_stats(&self) -> &Arc<DashMap<Arc<KafkaProvider>, TopicStat>> {
&self.topic_stats
}
}
@@ -192,7 +199,8 @@ impl ClientManager {
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use tokio::sync::Barrier;
use super::*;
@@ -201,70 +209,64 @@ mod tests {
/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, &topics[region_id % topics.len()]))
.collect::<HashMap<_, _>>();
maybe_skip_kafka_integration_test!();
let broker_endpoints = get_kafka_endpoints();
let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, &topics[region_id % topics.len()]))
.collect::<HashMap<_, _>>();
// Gets all clients sequentially.
for (_, topic) in region_topic {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
manager.get_or_insert(&provider).await.unwrap();
}
// Gets all clients sequentially.
for (_, topic) in region_topic {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
manager.get_or_insert(&provider).await.unwrap();
}
// Ensures all clients exist.
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
})
})
.await;
// Ensures all clients exist.
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
}
/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test(flavor = "multi_thread")]
async fn test_parallel() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
.collect::<HashMap<_, _>>();
maybe_skip_kafka_integration_test!();
let broker_endpoints = get_kafka_endpoints();
let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, topics[region_id % topics.len()].clone()))
.collect::<HashMap<_, _>>();
// Gets all clients in parallel.
let manager = Arc::new(manager);
let barrier = Arc::new(Barrier::new(region_topic.len()));
let tasks = region_topic
.into_values()
.map(|topic| {
let manager = manager.clone();
let barrier = barrier.clone();
// Gets all clients in parallel.
let manager = Arc::new(manager);
let barrier = Arc::new(Barrier::new(region_topic.len()));
let tasks = region_topic
.into_values()
.map(|topic| {
let manager = manager.clone();
let barrier = barrier.clone();
tokio::spawn(async move {
barrier.wait().await;
let provider = Arc::new(KafkaProvider::new(topic));
assert!(manager.get_or_insert(&provider).await.is_ok());
})
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
// Ensures all clients exist.
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
tokio::spawn(async move {
barrier.wait().await;
let provider = Arc::new(KafkaProvider::new(topic));
assert!(manager.get_or_insert(&provider).await.is_ok());
})
})
})
.await;
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
// Ensures all clients exist.
let client_pool = manager.instances.read().await;
let all_exist = topics.iter().all(|topic| {
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
client_pool.contains_key(&provider)
});
assert!(all_exist);
}
}

View File

@@ -1,131 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::error;
use dashmap::DashMap;
use store_api::logstore::provider::KafkaProvider;
use tokio::time::{interval, MissedTickBehavior};
use crate::error::Result;
use crate::kafka::client_manager::ClientManagerRef;
/// HighWatermarkManager is responsible for periodically updating the high watermark
/// (latest existing record offset) for each Kafka topic.
pub(crate) struct HighWatermarkManager {
/// Interval to update high watermark.
update_interval: Duration,
/// The high watermark for each topic.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
/// Client manager to send requests.
client_manager: ClientManagerRef,
}
impl HighWatermarkManager {
pub(crate) fn new(
update_interval: Duration,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
client_manager: ClientManagerRef,
) -> Self {
Self {
update_interval,
high_watermark,
client_manager,
}
}
/// Starts the high watermark manager as a background task
///
/// This spawns a task that periodically queries Kafka for the latest
/// high watermark values for all registered topics and updates the shared map.
pub(crate) async fn run(self) {
common_runtime::spawn_global(async move {
let mut interval = interval(self.update_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.try_update().await {
error!(e; "Failed to update high watermark");
}
}
});
}
/// Attempts to update the high watermark for all registered topics
///
/// Iterates through all topics in the high watermark map, obtains a producer
/// for each topic, and requests an update of the high watermark value.
pub(crate) async fn try_update(&self) -> Result<()> {
for iterator_element in self.high_watermark.iter() {
let producer = self
.client_manager
.get_or_insert(iterator_element.key())
.await?
.producer()
.clone();
producer.update_high_watermark().await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;
use store_api::storage::RegionId;
use super::*;
use crate::kafka::test_util::{prepare, record};
#[tokio::test]
async fn test_try_update_high_watermark() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) =
prepare("test_try_update_high_watermark", 1, broker_endpoints).await;
let manager = Arc::new(manager);
let high_watermark_manager = HighWatermarkManager::new(
Duration::from_millis(100),
manager.high_watermark().clone(),
manager.clone(),
);
let high_watermark = high_watermark_manager.high_watermark.clone();
high_watermark_manager.run().await;
let topic = topics[0].clone();
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
let producer = manager
.get_or_insert(&provider)
.await
.unwrap()
.producer()
.clone();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_high_watermark = *high_watermark.get(&provider).unwrap();
assert_eq!(current_high_watermark, 0);
let record = vec![record()];
let region = RegionId::new(1, 1);
producer.produce(region, record.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_high_watermark = *high_watermark.get(&provider).unwrap();
assert_eq!(current_high_watermark, record.len() as u64);
})
})
.await
}
}

View File

@@ -16,7 +16,10 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use common_meta::datanode::TopicStatsReporter;
use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
use common_telemetry::{debug, warn};
use common_time::util::current_time_millis;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use futures::future::try_join_all;
@@ -33,17 +36,33 @@ use store_api::storage::RegionId;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
use crate::kafka::high_watermark_manager::HighWatermarkManager;
use crate::kafka::index::{
build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE,
};
use crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher;
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
};
use crate::metrics;
const DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60);
/// Statistics for a topic.
#[derive(Debug, Clone, Copy, Default)]
pub struct TopicStat {
/// Latest offset for the topic.
///
/// The latest offset is updated in two ways:
/// - Automatically when the producer successfully commits data to Kafka
/// - Periodically by the [PeriodicOffsetFetcher](crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher).
///
pub latest_offset: u64,
/// Total size in bytes of records appended to the topic.
pub record_size: u64,
/// Total number of records appended to the topic.
pub record_num: u64,
}
/// A log store backed by Kafka.
#[derive(Debug)]
@@ -56,18 +75,70 @@ pub struct KafkaLogStore {
consumer_wait_timeout: Duration,
/// Ignore missing entries during read WAL.
overwrite_entry_start_id: bool,
/// High watermark for all topics.
/// The stats of each topic.
///
/// Represents the offset of the last record in each topic. This is used to track
/// the latest available data in Kafka topics.
///
/// The high watermark is updated in two ways:
/// - Automatically when the producer successfully commits data to Kafka
/// - Periodically by the [HighWatermarkManager](crate::kafka::high_watermark_manager::HighWatermarkManager).
///
/// This shared map allows multiple components to access the latest high watermark
/// This shared map allows multiple components to access the latest stats
/// information without needing to query Kafka directly.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
}
struct PeriodicTopicStatsReporter {
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
last_reported_timestamp_millis: i64,
report_interval_millis: i64,
}
impl PeriodicTopicStatsReporter {
fn align_ts(ts: i64, report_interval_millis: i64) -> i64 {
(ts / report_interval_millis) * report_interval_millis
}
/// Creates a new [PeriodicTopicStatsReporter].
///
/// # Panics
///
/// Panics if `report_interval` is zero.
fn new(
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
report_interval: Duration,
) -> Self {
assert!(!report_interval.is_zero());
let report_interval_millis = report_interval.as_millis() as i64;
let last_reported_timestamp_millis =
Self::align_ts(current_time_millis(), report_interval_millis);
Self {
topic_stats,
last_reported_timestamp_millis,
report_interval_millis,
}
}
}
impl TopicStatsReporter for PeriodicTopicStatsReporter {
fn reportable_topics(&mut self) -> Vec<common_meta::datanode::TopicStat> {
let now = Self::align_ts(current_time_millis(), self.report_interval_millis);
if now < self.last_reported_timestamp_millis + self.report_interval_millis {
debug!("Skip reporting topic stats because the interval is not reached");
return vec![];
}
self.last_reported_timestamp_millis = now;
let mut reportable_topics = Vec::with_capacity(self.topic_stats.len());
for e in self.topic_stats.iter() {
let topic_stat = e.value();
let topic_stat = common_meta::datanode::TopicStat {
topic: e.key().topic.clone(),
latest_entry_id: topic_stat.latest_offset,
record_size: topic_stat.record_size,
record_num: topic_stat.record_num,
};
debug!("Reportable topic: {:?}", topic_stat);
reportable_topics.push(topic_stat);
}
debug!("Reportable {} topics at {}", reportable_topics.len(), now);
reportable_topics
}
}
impl KafkaLogStore {
@@ -76,25 +147,30 @@ impl KafkaLogStore {
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
let high_watermark = Arc::new(DashMap::new());
let topic_stats = Arc::new(DashMap::new());
let client_manager = Arc::new(
ClientManager::try_new(config, global_index_collector, high_watermark.clone()).await?,
ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?,
);
let high_watermark_manager = HighWatermarkManager::new(
DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL,
high_watermark.clone(),
client_manager.clone(),
);
high_watermark_manager.run().await;
let fetcher =
PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone());
fetcher.run().await;
Ok(Self {
client_manager,
max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
consumer_wait_timeout: config.consumer_wait_timeout,
overwrite_entry_start_id: config.overwrite_entry_start_id,
high_watermark,
topic_stats,
})
}
/// Returns the topic stats.
pub fn topic_stats_reporter(&self) -> Box<dyn TopicStatsReporter> {
Box::new(PeriodicTopicStatsReporter::new(
self.topic_stats.clone(),
Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
))
}
}
fn build_entry(
@@ -226,13 +302,6 @@ impl LogStore for KafkaLogStore {
))
.await?;
// Updates the high watermark offset of the last record in the topic.
for (region_id, offset) in &region_grouped_max_offset {
// Safety: `region_id` is always valid.
let provider = region_to_provider.get(region_id).unwrap();
self.high_watermark.insert(provider.clone(), *offset);
}
Ok(AppendBatchResponse {
last_entry_ids: region_grouped_max_offset.into_iter().collect(),
})
@@ -418,7 +487,7 @@ impl LogStore for KafkaLogStore {
}
/// Returns the highest entry id of the specified topic in remote WAL.
fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
fn latest_entry_id(&self, provider: &Provider) -> Result<EntryId> {
let provider = provider
.as_kafka_provider()
.with_context(|| InvalidProviderSnafu {
@@ -426,14 +495,14 @@ impl LogStore for KafkaLogStore {
actual: provider.type_name(),
})?;
let high_watermark = self
.high_watermark
let stat = self
.topic_stats
.get(provider)
.as_deref()
.copied()
.unwrap_or(0);
.unwrap_or_default();
Ok(high_watermark)
Ok(stat.latest_offset)
}
/// Stops components of the logstore.
@@ -458,12 +527,16 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_meta::datanode::TopicStatsReporter;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use futures::TryStreamExt;
use rand::prelude::SliceRandom;
use rand::Rng;
@@ -473,7 +546,7 @@ mod tests {
use store_api::storage::RegionId;
use super::build_entry;
use crate::kafka::log_store::KafkaLogStore;
use crate::kafka::log_store::{KafkaLogStore, PeriodicTopicStatsReporter, TopicStat};
#[test]
fn test_build_naive_entry() {
@@ -630,7 +703,7 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries);
}
let high_wathermark = logstore.high_watermark(&provider).unwrap();
let high_wathermark = logstore.latest_entry_id(&provider).unwrap();
assert_eq!(high_wathermark, 99);
}
@@ -706,7 +779,35 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries);
}
let high_wathermark = logstore.high_watermark(&provider).unwrap();
let high_wathermark = logstore.latest_entry_id(&provider).unwrap();
assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1);
}
#[tokio::test]
async fn test_topic_stats_reporter() {
common_telemetry::init_default_ut_logging();
let topic_stats = Arc::new(DashMap::new());
let provider = Provider::kafka_provider("my_topic".to_string());
topic_stats.insert(
provider.as_kafka_provider().unwrap().clone(),
TopicStat {
latest_offset: 0,
record_size: 0,
record_num: 0,
},
);
let mut reporter = PeriodicTopicStatsReporter::new(topic_stats, Duration::from_secs(1));
// The first reportable topics should be empty.
let reportable_topics = reporter.reportable_topics();
assert_eq!(reportable_topics.len(), 0);
// After 1 second, the reportable topics should be the topic in the topic_stats.
tokio::time::sleep(Duration::from_secs(1)).await;
let reportable_topics = reporter.reportable_topics();
assert_eq!(reportable_topics.len(), 1);
// Call it immediately, should be empty.
let reportable_topics = reporter.reportable_topics();
assert_eq!(reportable_topics.len(), 0);
}
}

View File

@@ -0,0 +1,122 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_telemetry::{debug, error, info};
use tokio::time::{interval, MissedTickBehavior};
use crate::error::Result;
use crate::kafka::client_manager::ClientManagerRef;
/// PeriodicOffsetFetcher is responsible for periodically updating the offset
/// for each Kafka topic.
pub(crate) struct PeriodicOffsetFetcher {
/// Interval to fetch the latest offset.
interval: Duration,
/// Client manager to send requests.
client_manager: ClientManagerRef,
}
impl PeriodicOffsetFetcher {
pub(crate) fn new(interval: Duration, client_manager: ClientManagerRef) -> Self {
Self {
interval,
client_manager,
}
}
/// Starts the offset fetcher as a background task
///
/// This spawns a task that periodically queries Kafka for the latest
/// offset values for all registered topics and updates the shared map.
pub(crate) async fn run(self) {
common_runtime::spawn_global(async move {
info!("PeriodicOffsetFetcher started");
let mut interval = interval(self.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.try_update().await {
error!(e; "Failed to update latest offset");
}
}
});
}
/// Tries to refresh the latest offset for every registered topic.
///
/// For each topic in the stats map, retrieves its producer and requests
/// an update of the latest offset from Kafka.
pub(crate) async fn try_update(&self) -> Result<()> {
let topics = self.client_manager.list_topics().await;
for topic in topics.iter() {
debug!("Fetching latest offset for topic: {}", topic.topic);
let producer = self
.client_manager
.get_or_insert(topic)
.await?
.producer()
.clone();
producer.fetch_latest_offset().await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use super::*;
use crate::kafka::test_util::{prepare, record};
#[tokio::test]
async fn test_try_update_latest_offset() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let broker_endpoints = get_kafka_endpoints();
let (manager, topics) = prepare("test_try_update_latest_offset", 1, broker_endpoints).await;
let manager = Arc::new(manager);
let fetcher = PeriodicOffsetFetcher::new(Duration::from_millis(100), manager.clone());
let topic_stats = manager.topic_stats().clone();
fetcher.run().await;
let topic = topics[0].clone();
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
let producer = manager
.get_or_insert(&provider)
.await
.unwrap()
.producer()
.clone();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, 0);
let record = vec![record()];
let region = RegionId::new(1, 1);
producer.produce(region, record.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, record.len() as u64);
}
}

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use common_telemetry::warn;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient};
use rskafka::client::producer::ProduceResult;
use rskafka::record::Record;
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
@@ -24,6 +25,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use crate::error::{self, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::log_store::TopicStat;
use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest};
use crate::metrics::{
METRIC_KAFKA_CLIENT_BYTES_TOTAL, METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED,
@@ -57,7 +59,7 @@ impl OrderedBatchProducer {
compression: Compression,
max_batch_bytes: usize,
index_collector: Box<dyn IndexCollector>,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
) -> Self {
let mut worker = BackgroundProducerWorker {
provider,
@@ -67,7 +69,7 @@ impl OrderedBatchProducer {
request_batch_size: REQUEST_BATCH_SIZE,
max_batch_bytes,
index_collector,
high_watermark,
topic_stats,
};
tokio::spawn(async move { worker.run().await });
Self { sender: tx }
@@ -94,12 +96,12 @@ impl OrderedBatchProducer {
Ok(handle)
}
/// Sends an [WorkerRequest::UpdateHighWatermark] request to the producer.
/// This is used to update the high watermark for the topic.
pub(crate) async fn update_high_watermark(&self) -> Result<()> {
/// Sends an [WorkerRequest::FetchLatestOffset] request to the producer.
/// This is used to fetch the latest offset for the topic.
pub(crate) async fn fetch_latest_offset(&self) -> Result<()> {
if self
.sender
.send(WorkerRequest::UpdateHighWatermark)
.send(WorkerRequest::FetchLatestOffset)
.await
.is_err()
{
@@ -116,7 +118,7 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync {
&self,
records: Vec<Record>,
compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>>;
) -> rskafka::client::error::Result<ProduceResult>;
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result<i64>;
}
@@ -127,20 +129,22 @@ impl ProducerClient for PartitionClient {
&self,
records: Vec<Record>,
compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>> {
let total_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
) -> rskafka::client::error::Result<ProduceResult> {
let partition = self.partition().to_string();
METRIC_KAFKA_CLIENT_BYTES_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc_by(total_size as u64);
METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc();
let _timer = METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED
.with_label_values(&[self.topic(), &partition])
.start_timer();
self.produce(records, compression).await
let result = self.produce(records, compression).await?;
METRIC_KAFKA_CLIENT_BYTES_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc_by(result.encoded_request_size as u64);
METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL
.with_label_values(&[self.topic(), &partition])
.inc();
Ok(result)
}
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result<i64> {
@@ -182,7 +186,7 @@ mod tests {
&self,
records: Vec<Record>,
_compression: Compression,
) -> rskafka::client::error::Result<Vec<i64>> {
) -> rskafka::client::error::Result<ProduceResult> {
tokio::time::sleep(self.delay).await;
if let Some(e) = self.error {
@@ -206,7 +210,10 @@ mod tests {
.collect();
batch_sizes.push(records.len());
debug!("Return offsets: {offsets:?}");
Ok(offsets)
Ok(ProduceResult {
offsets,
encoded_request_size: 0,
})
}
async fn get_offset(&self, _at: OffsetAt) -> rskafka::client::error::Result<i64> {

View File

@@ -13,9 +13,9 @@
// limitations under the License.
pub(crate) mod dump_index;
pub(crate) mod fetch_latest_offset;
pub(crate) mod flush;
pub(crate) mod produce;
pub(crate) mod update_high_watermark;
use std::sync::Arc;
@@ -33,13 +33,14 @@ use tokio::sync::oneshot::{self};
use crate::error::{self, NoMaxValueSnafu, Result};
use crate::kafka::index::{IndexCollector, IndexEncoder};
use crate::kafka::log_store::TopicStat;
use crate::kafka::producer::ProducerClient;
pub(crate) enum WorkerRequest {
Produce(ProduceRequest),
TruncateIndex(TruncateIndexRequest),
DumpIndex(DumpIndexRequest),
UpdateHighWatermark,
FetchLatestOffset,
}
impl WorkerRequest {
@@ -160,8 +161,8 @@ pub(crate) struct BackgroundProducerWorker {
pub(crate) max_batch_bytes: usize,
/// Collecting ids of WAL entries.
pub(crate) index_collector: Box<dyn IndexCollector>,
/// High watermark for all topics.
pub(crate) high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
/// The stats of each topic.
pub(crate) topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
}
impl BackgroundProducerWorker {
@@ -199,8 +200,8 @@ impl BackgroundProducerWorker {
entry_id,
}) => self.index_collector.truncate(region_id, entry_id),
WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
WorkerRequest::UpdateHighWatermark => {
self.update_high_watermark().await;
WorkerRequest::FetchLatestOffset => {
self.fetch_latest_offset().await;
}
}
}

View File

@@ -17,14 +17,15 @@ use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use crate::error;
use crate::kafka::log_store::TopicStat;
use crate::kafka::worker::BackgroundProducerWorker;
impl BackgroundProducerWorker {
/// Updates the high watermark for the topic.
/// Fetches the latest offset for the topic.
///
/// This function retrieves the latest offset from Kafka and updates the high watermark
/// This function retrieves the topic's latest offset from Kafka and updates the latest offset
/// in the shared map.
pub async fn update_high_watermark(&mut self) {
pub async fn fetch_latest_offset(&mut self) {
match self
.client
.get_offset(OffsetAt::Latest)
@@ -32,27 +33,32 @@ impl BackgroundProducerWorker {
.context(error::GetOffsetSnafu {
topic: &self.provider.topic,
}) {
Ok(offset) => match self.high_watermark.entry(self.provider.clone()) {
Ok(offset) => match self.topic_stats.entry(self.provider.clone()) {
dashmap::Entry::Occupied(mut occupied_entry) => {
let offset = offset as u64;
if *occupied_entry.get() != offset {
occupied_entry.insert(offset);
let stat = occupied_entry.get_mut();
if stat.latest_offset < offset {
stat.latest_offset = offset;
debug!(
"Updated high watermark for topic {} to {}",
"Updated latest offset for topic {} to {}",
self.provider.topic, offset
);
}
}
dashmap::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(offset as u64);
vacant_entry.insert(TopicStat {
latest_offset: offset as u64,
record_size: 0,
record_num: 0,
});
debug!(
"Inserted high watermark for topic {} to {}",
"Inserted latest offset for topic {} to {}",
self.provider.topic, offset
);
}
},
Err(err) => {
error!(err; "Failed to get offset for topic {}", self.provider.topic);
error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
}
}
}

View File

@@ -16,6 +16,7 @@ use common_telemetry::warn;
use snafu::ResultExt;
use crate::error;
use crate::kafka::log_store::TopicStat;
use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest};
impl BackgroundProducerWorker {
@@ -28,6 +29,7 @@ impl BackgroundProducerWorker {
size: _size,
}: PendingRequest,
) {
let record_num = batch.len() as u64;
let result = self
.client
.produce(batch, self.compression)
@@ -35,12 +37,27 @@ impl BackgroundProducerWorker {
.context(error::BatchProduceSnafu);
if let Ok(result) = &result {
for (idx, region_id) in result.iter().zip(region_ids) {
let total_record_size = result.encoded_request_size as u64;
for (idx, region_id) in result.offsets.iter().zip(region_ids) {
self.index_collector.append(region_id, *idx as u64);
}
let max_offset = result.offsets.iter().max().cloned().unwrap_or_default() as u64;
self.topic_stats
.entry(self.provider.clone())
.and_modify(|stat| {
stat.latest_offset = stat.latest_offset.max(max_offset);
stat.record_size += total_record_size;
stat.record_num += record_num;
})
.or_insert(TopicStat {
latest_offset: max_offset,
record_size: total_record_size,
record_num,
});
}
if let Err(err) = sender.send(result) {
if let Err(err) = sender.send(result.map(|r| r.offsets)) {
warn!(err; "BatchFlushState Receiver is dropped");
}
}

View File

@@ -484,7 +484,7 @@ impl LogStore for RaftEngineLogStore {
Ok(())
}
fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
fn latest_entry_id(&self, provider: &Provider) -> Result<EntryId> {
let ns = provider
.as_raft_engine_provider()
.with_context(|| InvalidProviderSnafu {

View File

@@ -882,6 +882,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to parse wal options"))]
ParseWalOptions {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to build kafka client."))]
BuildKafkaClient {
#[snafu(implicit)]
@@ -1050,7 +1057,8 @@ impl ErrorExt for Error {
| Error::RuntimeSwitchManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. } => source.status_code(),
| Error::UpdateTopicNameValue { source, .. }
| Error::ParseWalOptions { source, .. } => source.status_code(),
Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. }

View File

@@ -16,9 +16,12 @@ use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::lock_key::RemoteWalLock;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::error;
use common_telemetry::{error, warn};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::time::{sleep, Instant};
@@ -56,10 +59,23 @@ impl State for UpgradeCandidateRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let now = Instant::now();
if self.upgrade_region_with_retry(ctx).await {
let region_wal_option = self.get_region_wal_option(ctx).await?;
let region_id = ctx.persistent_ctx.region_id;
if region_wal_option.is_none() {
warn!(
"Region {} wal options not found, during upgrade candidate region",
region_id
);
}
if self
.upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
.await
{
ctx.update_upgrade_candidate_region_elapsed(now);
Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
} else {
@@ -74,6 +90,26 @@ impl State for UpgradeCandidateRegion {
}
impl UpgradeCandidateRegion {
async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
let region_id = ctx.persistent_ctx.region_id;
match ctx.get_from_peer_datanode_table_value().await {
Ok(datanode_table_value) => {
let region_wal_options =
parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
.context(error::ParseWalOptionsSnafu)?;
Ok(region_wal_options.get(&region_id.region_number()).cloned())
}
Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
warn!(
"Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
region_id, datanode_id
);
Ok(None)
}
Err(e) => Err(e),
}
}
/// Builds upgrade region instruction.
fn build_upgrade_region_instruction(
&self,
@@ -196,12 +232,30 @@ impl UpgradeCandidateRegion {
/// Upgrades a candidate region.
///
/// Returns true if the candidate region is upgraded successfully.
async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool {
async fn upgrade_region_with_retry(
&self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
wal_options: Option<&WalOptions>,
) -> bool {
let mut retry = 0;
let mut upgraded = false;
loop {
let timer = Instant::now();
// If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
Some(
procedure_ctx
.provider
.acquire_lock(
&(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
)
.await,
)
} else {
None
};
if let Err(err) = self.upgrade_region(ctx).await {
retry += 1;
ctx.update_operations_elapsed(timer);

View File

@@ -211,6 +211,10 @@ impl WalPruneProcedure {
.into_iter()
.map(|(region_id, region)| {
let prunable_entry_id = region.manifest.prunable_entry_id();
info!(
"Region {}, topic: {}, prunable_entry_id: {}",
region_id, self.data.topic, prunable_entry_id
);
(region_id, prunable_entry_id)
})
.collect();
@@ -246,6 +250,13 @@ impl WalPruneProcedure {
self.data.regions_to_flush.push(region_id);
}
}
info!(
"Flush regions: {:?}, trigger_flush_threshold: {}, prunable_entry_id: {}, max_prunable_entry_id: {}",
self.data.regions_to_flush,
self.data.trigger_flush_threshold,
self.data.prunable_entry_id,
max_prunable_entry_id
);
self.data.state = WalPruneState::FlushRegion;
} else {
self.data.state = WalPruneState::Prune;
@@ -420,7 +431,8 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatResponse;
use common_wal::test_util::run_test_with_kafka_wal;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use rskafka::record::Record;
use tokio::sync::mpsc::Receiver;
@@ -490,7 +502,8 @@ mod tests {
rskafka::client::partition::Compression::NoCompression,
)
.await
.unwrap();
.unwrap()
.offsets;
offsets.extend(offset);
}
offsets
@@ -550,103 +563,101 @@ mod tests {
#[tokio::test]
async fn test_procedure_execution() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
let mut topic_name = uuid::Uuid::new_v4().to_string();
// Topic should start with a letter.
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
maybe_skip_kafka_integration_test!();
let broker_endpoints = get_kafka_endpoints();
// Before any data in kvbackend is mocked, should return a retryable error.
let result = procedure.on_prune().await;
assert_matches!(result, Err(e) if e.is_retryable());
common_telemetry::init_default_ut_logging();
let mut topic_name = uuid::Uuid::new_v4().to_string();
// Topic should start with a letter.
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
// Before any data in kvbackend is mocked, should return a retryable error.
let result = procedure.on_prune().await;
assert_matches!(result, Err(e) if e.is_retryable());
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
);
for region_id in &regions_to_flush {
assert!(procedure.data.regions_to_flush.contains(region_id));
}
let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
// Step 2: Test `on_sending_flush_request`.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
env.mailbox
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let status = procedure.on_sending_flush_request().await.unwrap();
check_flush_request(&mut rx, &regions_to_flush).await;
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
);
for region_id in &regions_to_flush {
assert!(procedure.data.regions_to_flush.contains(region_id));
}
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after(include) `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64,
true,
)
.await;
// Check if the entry ids before `prunable_entry_id` are deleted.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64 - 1,
false,
)
.await;
// Step 2: Test `on_sending_flush_request`.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
env.mailbox
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let status = procedure.on_sending_flush_request().await.unwrap();
check_flush_request(&mut rx, &regions_to_flush).await;
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
let value = env
.table_metadata_manager
.topic_name_manager()
.get(&topic_name)
.await
.unwrap()
.unwrap();
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
procedure.data.trigger_flush_threshold = 0;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
})
})
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after(include) `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64,
true,
)
.await;
// Check if the entry ids before `prunable_entry_id` are deleted.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64 - 1,
false,
)
.await;
let value = env
.table_metadata_manager
.topic_name_manager()
.get(&topic_name)
.await
.unwrap()
.unwrap();
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
procedure.data.trigger_flush_threshold = 0;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
}
}

View File

@@ -334,7 +334,8 @@ mod test {
use std::assert_matches::assert_matches;
use common_meta::key::topic_name::TopicNameKey;
use common_wal::test_util::run_test_with_kafka_wal;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use tokio::time::{sleep, timeout};
use super::*;
@@ -413,27 +414,24 @@ mod test {
#[tokio::test]
async fn test_wal_prune_manager() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let limit = 6;
let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
let topics = (0..limit * 2)
.map(|_| uuid::Uuid::new_v4().to_string())
.collect::<Vec<_>>();
mock_topics(&manager, &topics).await;
maybe_skip_kafka_integration_test!();
let broker_endpoints = get_kafka_endpoints();
let limit = 6;
let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
let topics = (0..limit * 2)
.map(|_| uuid::Uuid::new_v4().to_string())
.collect::<Vec<_>>();
mock_topics(&manager, &topics).await;
let tracker = manager.tracker.clone();
let handler =
common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
handler.await.unwrap();
let tracker = manager.tracker.clone();
let handler =
common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
handler.await.unwrap();
tx.send(Event::Tick).await.unwrap();
// Wait for at least one procedure to be submitted.
timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
.await
.unwrap();
})
})
.await;
tx.send(Event::Tick).await.unwrap();
// Wait for at least one procedure to be submitted.
timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
.await
.unwrap();
}
}

View File

@@ -124,7 +124,7 @@ pub struct MitoRegion {
/// The topic's latest entry id since the region's last flushing.
/// **Only used for remote WAL pruning.**
///
/// The value will be updated to the high watermark of the topic
/// The value will be updated to the latest offset of the topic
/// if region receives a flush request or schedules a periodic flush task
/// and the region's memtable is empty.
///

View File

@@ -197,7 +197,7 @@ mod tests {
unreachable!()
}
fn high_watermark(&self, _provider: &Provider) -> Result<EntryId, Self::Error> {
fn latest_entry_id(&self, _provider: &Provider) -> Result<EntryId, Self::Error> {
unreachable!()
}
}

View File

@@ -102,22 +102,22 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let version = region.version_control.current();
let mut flushed_entry_id = version.last_entry_id;
let high_watermark = self
let latest_entry_id = self
.wal
.store()
.high_watermark(&region.provider)
.latest_entry_id(&region.provider)
.unwrap_or_default();
warn!(
"Skips to replay memtable for region: {}, flushed entry id: {}, high watermark: {}",
region.region_id, flushed_entry_id, high_watermark
"Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}",
region.region_id, flushed_entry_id, latest_entry_id
);
if high_watermark > flushed_entry_id {
if latest_entry_id > flushed_entry_id {
warn!(
"Found high watermark is greater than flushed entry id, using high watermark as flushed entry id, region: {}, high watermark: {}, flushed entry id: {}",
region_id, high_watermark, flushed_entry_id
"Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}",
region_id, latest_entry_id, flushed_entry_id
);
flushed_entry_id = high_watermark;
flushed_entry_id = latest_entry_id;
region.version_control.set_entry_id(flushed_entry_id);
}
let on_region_opened = self.wal.on_region_opened();

View File

@@ -17,7 +17,7 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use common_telemetry::{error, info};
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
@@ -259,20 +259,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// **This is only used for remote WAL pruning.**
pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) {
if region.provider.is_remote_wal() && region.version().memtables.is_empty() {
let high_watermark = self
let latest_offset = self
.wal
.store()
.high_watermark(&region.provider)
.latest_entry_id(&region.provider)
.unwrap_or(0);
let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed);
if high_watermark != 0 && high_watermark > topic_last_entry_id {
if latest_offset > topic_last_entry_id {
region
.topic_latest_entry_id
.store(high_watermark, Ordering::Relaxed);
info!(
"Region {} high watermark updated to {}",
region.region_id, high_watermark
.store(latest_offset, Ordering::Relaxed);
debug!(
"Region {} latest entry id updated to {}",
region.region_id, latest_offset
);
}
}

View File

@@ -95,8 +95,8 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
provider: &Provider,
) -> Result<Entry, Self::Error>;
/// Returns the highest existing entry id in the log store.
fn high_watermark(&self, provider: &Provider) -> Result<EntryId, Self::Error>;
/// Returns the latest entry id in the log store.
fn latest_entry_id(&self, provider: &Provider) -> Result<EntryId, Self::Error>;
}
/// The response of an `append` operation.

View File

@@ -89,6 +89,7 @@ uuid.workspace = true
zstd.workspace = true
[dev-dependencies]
common-wal = { workspace = true, features = ["testing"] }
datafusion.workspace = true
datafusion-expr.workspace = true
hex.workspace = true

View File

@@ -26,14 +26,12 @@ use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, StorageConfig};
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::config::{
AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
};
@@ -732,22 +730,3 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.await
.unwrap();
}
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
}

View File

@@ -77,10 +77,9 @@ macro_rules! region_migration_test {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
tests_integration::test_util::run_test_with_kafka_wal(|endpoints| {
Box::pin(async move { $crate::region_migration::$test(store_type, endpoints).await })
})
.await
common_wal::maybe_skip_kafka_integration_test!();
let endpoints = common_wal::test_util::get_kafka_endpoints();
$crate::region_migration::$test(store_type, endpoints).await
}
}