From 8659412cacc9cfbca09545349c4bf1f3dd8b4ec5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 13 Aug 2025 19:46:50 +0800 Subject: [PATCH] feat: introduce `PeriodicTopicStatsReporter` (#6730) * refactor: introduce `PeriodicTopicStatsReporter` Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * fix: fix typo Signed-off-by: WenyXu * chore: remote wal tests styling Signed-off-by: WenyXu * fix: fix unit test Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * fix: handling region wal options not found Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * fix: minor Signed-off-by: WenyXu * chore: upgrade greptime-proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 4 +- Cargo.toml | 4 +- src/common/meta/src/datanode.rs | 18 ++ .../meta/src/distributed_time_constants.rs | 6 + src/common/meta/src/test_util.rs | 8 +- src/common/wal/src/test_util.rs | 20 -- src/datanode/src/datanode.rs | 11 +- src/datanode/src/heartbeat.rs | 2 + .../src/heartbeat/handler/flush_region.rs | 28 ++- src/datanode/src/region_server.rs | 34 +++ src/log-store/src/kafka.rs | 2 +- src/log-store/src/kafka/client_manager.rs | 130 ++++++------ .../src/kafka/high_watermark_manager.rs | 131 ------------ src/log-store/src/kafka/log_store.rs | 173 +++++++++++---- .../src/kafka/periodic_offset_fetcher.rs | 122 +++++++++++ src/log-store/src/kafka/producer.rs | 43 ++-- src/log-store/src/kafka/worker.rs | 13 +- ...gh_watermark.rs => fetch_latest_offset.rs} | 26 ++- src/log-store/src/kafka/worker/flush.rs | 21 +- src/log-store/src/raft_engine/log_store.rs | 2 +- src/meta-srv/src/error.rs | 10 +- .../upgrade_candidate_region.rs | 62 +++++- src/meta-srv/src/procedure/wal_prune.rs | 197 +++++++++--------- .../src/procedure/wal_prune/manager.rs | 40 ++-- src/mito2/src/region.rs | 2 +- src/mito2/src/wal/raw_entry_reader.rs | 2 +- src/mito2/src/worker/handle_catchup.rs | 16 +- src/mito2/src/worker/handle_flush.rs | 16 +- src/store-api/src/logstore.rs | 4 +- tests-integration/Cargo.toml | 1 + tests-integration/src/test_util.rs | 21 -- tests-integration/tests/region_migration.rs | 7 +- 32 files changed, 707 insertions(+), 469 deletions(-) delete mode 100644 src/log-store/src/kafka/high_watermark_manager.rs create mode 100644 src/log-store/src/kafka/periodic_offset_fetcher.rs rename src/log-store/src/kafka/worker/{update_high_watermark.rs => fetch_latest_offset.rs} (62%) diff --git a/Cargo.lock b/Cargo.lock index c36b5917e1..1228bbc5cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5324,7 +5324,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ccfd4da48bc0254ed865e479cd981a3581b02d84#ccfd4da48bc0254ed865e479cd981a3581b02d84" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69680846a078aae670d93fb30511a72738345199#69680846a078aae670d93fb30511a72738345199" dependencies = [ "prost 0.13.5", "serde", @@ -10869,7 +10869,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.6.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820" +source = "git+https://github.com/WenyXu/rskafka.git?rev=bc582e98918def613a882581a1b9331d186d9b2d#bc582e98918def613a882581a1b9331d186d9b2d" dependencies = [ "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 4c393c9a5f..491be89a82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ccfd4da48bc0254ed865e479cd981a3581b02d84" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69680846a078aae670d93fb30511a72738345199" } hex = "0.4" http = "1" humantime = "2.1" @@ -188,7 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [ +rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "bc582e98918def613a882581a1b9331d186d9b2d", features = [ "transport-tls", ] } rstest = "0.25" diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 1db59b7c8e..e2e79c9e81 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -108,6 +108,24 @@ pub struct RegionStat { pub metadata_topic_latest_entry_id: u64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TopicStat { + /// The topic name. + pub topic: String, + /// The latest entry id of the topic. + pub latest_entry_id: u64, + /// The total size in bytes of records appended to the topic. + pub record_size: u64, + /// The total number of records appended to the topic. + pub record_num: u64, +} + +/// Trait for reporting statistics about topics. +pub trait TopicStatsReporter: Send + Sync { + /// Returns a list of topic statistics that can be reported. + fn reportable_topics(&mut self) -> Vec; +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum RegionManifestInfo { Mito { diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 99181dcd0c..d18b377c28 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -43,3 +43,9 @@ pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; /// The default mailbox round-trip timeout. pub const MAILBOX_RTT_SECS: u64 = 1; + +/// The interval of reporting topic stats. +pub const TOPIC_STATS_REPORT_INTERVAL_SECS: u64 = 15; + +/// The retention seconds of topic stats. +pub const TOPIC_STATS_RETENTION_SECS: u64 = TOPIC_STATS_REPORT_INTERVAL_SECS * 100; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 491a4e70c6..ac93e13c94 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -251,11 +251,11 @@ pub async fn test_kafka_topic_pool( } #[macro_export] -/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set. +/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set. /// /// The format of the environment variable is: /// ``` -/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093 +/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093 /// ``` macro_rules! maybe_skip_postgres_integration_test { () => { @@ -267,11 +267,11 @@ macro_rules! maybe_skip_postgres_integration_test { } #[macro_export] -/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set. +/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set. /// /// The format of the environment variable is: /// ``` -/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093 +/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093 /// ``` macro_rules! maybe_skip_mysql_integration_test { () => { diff --git a/src/common/wal/src/test_util.rs b/src/common/wal/src/test_util.rs index a01e49feaa..d5f58c9f17 100644 --- a/src/common/wal/src/test_util.rs +++ b/src/common/wal/src/test_util.rs @@ -12,26 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::warn; -use futures_util::future::BoxFuture; - -pub async fn run_test_with_kafka_wal(test: F) -where - F: FnOnce(Vec) -> BoxFuture<'static, ()>, -{ - let Ok(endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { - warn!("The endpoints is empty, skipping the test"); - return; - }; - - let endpoints = endpoints - .split(',') - .map(|s| s.trim().to_string()) - .collect::>(); - - test(endpoints).await -} - /// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`. /// /// The format of the environment variable is: diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b45f7c95ed..9b5853bce3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -22,6 +22,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; +use common_meta::datanode::TopicStatsReporter; use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; @@ -162,6 +163,7 @@ pub struct DatanodeBuilder { meta_client: Option, kv_backend: KvBackendRef, cache_registry: Option>, + topic_stats_reporter: Option>, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, } @@ -177,6 +179,7 @@ impl DatanodeBuilder { cache_registry: None, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, + topic_stats_reporter: None, } } @@ -414,6 +417,9 @@ impl DatanodeBuilder { for engine in engines { region_server.register_engine(engine); } + if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() { + region_server.set_topic_stats_reporter(topic_stats_reporter); + } Ok(region_server) } @@ -527,10 +533,13 @@ impl DatanodeBuilder { None }; + let log_store = + Self::build_kafka_log_store(kafka_config, global_index_collector).await?; + self.topic_stats_reporter = Some(log_store.topic_stats_reporter()); let builder = MitoEngineBuilder::new( &opts.storage.data_home, config, - Self::build_kafka_log_store(kafka_config, global_index_collector).await?, + log_store, object_store_manager, schema_metadata_manager, plugins, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 590095bee2..352c35475d 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -279,10 +279,12 @@ impl HeartbeatTask { } _ = &mut sleep => { let region_stats = Self::load_region_stats(®ion_server_clone); + let topic_stats = region_server_clone.topic_stats(); let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { region_stats, + topic_stats, duration_since_epoch, ..heartbeat_request.clone() }; diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index 20867d645c..eec28fc665 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply}; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use futures_util::future::BoxFuture; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; @@ -27,26 +29,38 @@ impl HandlerContext { flush_regions: FlushRegions, ) -> BoxFuture<'static, Option> { Box::pin(async move { - for region_id in flush_regions.region_ids { + let start_time = Instant::now(); + for region_id in &flush_regions.region_ids { let request = RegionRequest::Flush(RegionFlushRequest { row_group_size: None, }); - let result = self.region_server.handle_request(region_id, request).await; + let now = Instant::now(); + let result = self.region_server.handle_request(*region_id, request).await; + let elapsed = now.elapsed(); + info!("Flush region: {}, elapsed: {:?}", region_id, elapsed); match result { Ok(_) => {} Err(error::Error::RegionNotFound { .. }) => { - warn!("Received a flush region instruction from meta, but target region: {region_id} is not found."); + warn!( + "Received a flush region instruction from meta, but target region: {} is not found., elapsed: {:?}", + region_id, + elapsed + ); } Err(err) => { warn!( - "Failed to flush region: {region_id}, error: {err}", - region_id = region_id, - err = err, + "Failed to flush region: {}, error: {}, elapsed: {:?}", + region_id, err, elapsed ); } } } + let elapsed = start_time.elapsed(); + info!( + "Flush regions: {:?}, elapsed: {:?}", + flush_regions.region_ids, elapsed + ); None }) } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ad2d13031b..882d1b2e92 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use api::region::RegionResponse; +use api::v1::meta::TopicStat; use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, @@ -29,6 +30,7 @@ use async_trait::async_trait; use bytes::Bytes; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; +use common_meta::datanode::TopicStatsReporter; use common_query::request::QueryRequest; use common_query::OutputData; use common_recordbatch::SendableRecordBatchStream; @@ -135,10 +137,16 @@ impl RegionServer { } } + /// Registers an engine. pub fn register_engine(&mut self, engine: RegionEngineRef) { self.inner.register_engine(engine); } + /// Sets the topic stats. + pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box) { + self.inner.set_topic_stats_reporter(topic_stats_reporter); + } + /// Finds the region's engine by its id. If the region is not ready, returns `None`. pub fn find_engine(&self, region_id: RegionId) -> Result> { match self.inner.get_engine(region_id, &RegionChange::None) { @@ -305,6 +313,24 @@ impl RegionServer { .collect() } + /// Returns the reportable topics. + pub fn topic_stats(&self) -> Vec { + let mut reporter = self.inner.topic_stats_reporter.write().unwrap(); + let Some(reporter) = reporter.as_mut() else { + return vec![]; + }; + reporter + .reportable_topics() + .into_iter() + .map(|stat| TopicStat { + topic_name: stat.topic, + record_size: stat.record_size, + record_num: stat.record_num, + latest_entry_id: stat.latest_entry_id, + }) + .collect() + } + pub fn is_region_leader(&self, region_id: RegionId) -> Option { self.inner.region_map.get(®ion_id).and_then(|engine| { engine.role(region_id).map(|role| match role { @@ -669,6 +695,8 @@ struct RegionServerInner { // The number of queries allowed to be executed at the same time. // Act as last line of defense on datanode to prevent query overloading. parallelism: Option, + // The topic stats reporter. + topic_stats_reporter: RwLock>>, } struct RegionServerParallelism { @@ -734,6 +762,7 @@ impl RegionServerInner { event_listener, table_provider_factory, parallelism, + topic_stats_reporter: RwLock::new(None), } } @@ -746,6 +775,11 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } + pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box) { + info!("Set topic stats reporter"); + *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter); + } + fn get_engine( &self, region_id: RegionId, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 452c88164a..b16834cb97 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,9 +14,9 @@ pub(crate) mod client_manager; pub(crate) mod consumer; -mod high_watermark_manager; pub(crate) mod index; pub mod log_store; +mod periodic_offset_fetcher; pub(crate) mod producer; #[cfg(test)] pub(crate) mod test_util; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 1e172a268c..146164acd4 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -28,6 +28,7 @@ use crate::error::{ BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu, }; use crate::kafka::index::{GlobalIndexCollector, NoopCollector}; +use crate::kafka::log_store::TopicStat; use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef}; // Each topic only has one partition for now. @@ -66,8 +67,8 @@ pub(crate) struct ClientManager { flush_batch_size: usize, compression: Compression, - /// High watermark for each topic. - high_watermark: Arc, u64>>, + /// The stats of each topic. + topic_stats: Arc, TopicStat>>, } impl ClientManager { @@ -75,7 +76,7 @@ impl ClientManager { pub(crate) async fn try_new( config: &DatanodeKafkaConfig, global_index_collector: Option, - high_watermark: Arc, u64>>, + topic_stats: Arc, TopicStat>>, ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) @@ -101,7 +102,7 @@ impl ClientManager { flush_batch_size: config.max_batch_bytes.as_bytes() as usize, compression: Compression::Lz4, global_index_collector, - high_watermark, + topic_stats, }) } @@ -117,7 +118,8 @@ impl ClientManager { .write() .await .insert(provider.clone(), client.clone()); - self.high_watermark.insert(provider.clone(), 0); + self.topic_stats + .insert(provider.clone(), TopicStat::default()); Ok(client) } } @@ -166,7 +168,7 @@ impl ClientManager { self.compression, self.flush_batch_size, index_collector, - self.high_watermark.clone(), + self.topic_stats.clone(), )); Ok(Client { client, producer }) @@ -176,9 +178,14 @@ impl ClientManager { self.global_index_collector.as_ref() } + /// Lists all topics. + pub(crate) async fn list_topics(&self) -> Vec> { + self.instances.read().await.keys().cloned().collect() + } + #[cfg(test)] - pub(crate) fn high_watermark(&self) -> &Arc, u64>> { - &self.high_watermark + pub(crate) fn topic_stats(&self) -> &Arc, TopicStat>> { + &self.topic_stats } } @@ -192,7 +199,8 @@ impl ClientManager { #[cfg(test)] mod tests { - use common_wal::test_util::run_test_with_kafka_wal; + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; use tokio::sync::Barrier; use super::*; @@ -201,70 +209,64 @@ mod tests { /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. #[tokio::test] async fn test_sequential() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await; - // Assigns multiple regions to a topic. - let region_topic = (0..512) - .map(|region_id| (region_id, &topics[region_id % topics.len()])) - .collect::>(); + maybe_skip_kafka_integration_test!(); + let broker_endpoints = get_kafka_endpoints(); + let (manager, topics) = prepare("test_sequential", 128, broker_endpoints).await; + // Assigns multiple regions to a topic. + let region_topic = (0..512) + .map(|region_id| (region_id, &topics[region_id % topics.len()])) + .collect::>(); - // Gets all clients sequentially. - for (_, topic) in region_topic { - let provider = Arc::new(KafkaProvider::new(topic.to_string())); - manager.get_or_insert(&provider).await.unwrap(); - } + // Gets all clients sequentially. + for (_, topic) in region_topic { + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + manager.get_or_insert(&provider).await.unwrap(); + } - // Ensures all clients exist. - let client_pool = manager.instances.read().await; - let all_exist = topics.iter().all(|topic| { - let provider = Arc::new(KafkaProvider::new(topic.to_string())); - client_pool.contains_key(&provider) - }); - assert!(all_exist); - }) - }) - .await; + // Ensures all clients exist. + let client_pool = manager.instances.read().await; + let all_exist = topics.iter().all(|topic| { + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + client_pool.contains_key(&provider) + }); + assert!(all_exist); } /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly. #[tokio::test(flavor = "multi_thread")] async fn test_parallel() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await; - // Assigns multiple regions to a topic. - let region_topic = (0..512) - .map(|region_id| (region_id, topics[region_id % topics.len()].clone())) - .collect::>(); + maybe_skip_kafka_integration_test!(); + let broker_endpoints = get_kafka_endpoints(); + let (manager, topics) = prepare("test_parallel", 128, broker_endpoints).await; + // Assigns multiple regions to a topic. + let region_topic = (0..512) + .map(|region_id| (region_id, topics[region_id % topics.len()].clone())) + .collect::>(); - // Gets all clients in parallel. - let manager = Arc::new(manager); - let barrier = Arc::new(Barrier::new(region_topic.len())); - let tasks = region_topic - .into_values() - .map(|topic| { - let manager = manager.clone(); - let barrier = barrier.clone(); + // Gets all clients in parallel. + let manager = Arc::new(manager); + let barrier = Arc::new(Barrier::new(region_topic.len())); + let tasks = region_topic + .into_values() + .map(|topic| { + let manager = manager.clone(); + let barrier = barrier.clone(); - tokio::spawn(async move { - barrier.wait().await; - let provider = Arc::new(KafkaProvider::new(topic)); - assert!(manager.get_or_insert(&provider).await.is_ok()); - }) - }) - .collect::>(); - futures::future::try_join_all(tasks).await.unwrap(); - - // Ensures all clients exist. - let client_pool = manager.instances.read().await; - let all_exist = topics.iter().all(|topic| { - let provider = Arc::new(KafkaProvider::new(topic.to_string())); - client_pool.contains_key(&provider) - }); - assert!(all_exist); + tokio::spawn(async move { + barrier.wait().await; + let provider = Arc::new(KafkaProvider::new(topic)); + assert!(manager.get_or_insert(&provider).await.is_ok()); + }) }) - }) - .await; + .collect::>(); + futures::future::try_join_all(tasks).await.unwrap(); + + // Ensures all clients exist. + let client_pool = manager.instances.read().await; + let all_exist = topics.iter().all(|topic| { + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + client_pool.contains_key(&provider) + }); + assert!(all_exist); } } diff --git a/src/log-store/src/kafka/high_watermark_manager.rs b/src/log-store/src/kafka/high_watermark_manager.rs deleted file mode 100644 index 8a4c2a1252..0000000000 --- a/src/log-store/src/kafka/high_watermark_manager.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Duration; - -use common_telemetry::error; -use dashmap::DashMap; -use store_api::logstore::provider::KafkaProvider; -use tokio::time::{interval, MissedTickBehavior}; - -use crate::error::Result; -use crate::kafka::client_manager::ClientManagerRef; - -/// HighWatermarkManager is responsible for periodically updating the high watermark -/// (latest existing record offset) for each Kafka topic. -pub(crate) struct HighWatermarkManager { - /// Interval to update high watermark. - update_interval: Duration, - /// The high watermark for each topic. - high_watermark: Arc, u64>>, - /// Client manager to send requests. - client_manager: ClientManagerRef, -} - -impl HighWatermarkManager { - pub(crate) fn new( - update_interval: Duration, - high_watermark: Arc, u64>>, - client_manager: ClientManagerRef, - ) -> Self { - Self { - update_interval, - high_watermark, - client_manager, - } - } - - /// Starts the high watermark manager as a background task - /// - /// This spawns a task that periodically queries Kafka for the latest - /// high watermark values for all registered topics and updates the shared map. - pub(crate) async fn run(self) { - common_runtime::spawn_global(async move { - let mut interval = interval(self.update_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - interval.tick().await; - if let Err(e) = self.try_update().await { - error!(e; "Failed to update high watermark"); - } - } - }); - } - - /// Attempts to update the high watermark for all registered topics - /// - /// Iterates through all topics in the high watermark map, obtains a producer - /// for each topic, and requests an update of the high watermark value. - pub(crate) async fn try_update(&self) -> Result<()> { - for iterator_element in self.high_watermark.iter() { - let producer = self - .client_manager - .get_or_insert(iterator_element.key()) - .await? - .producer() - .clone(); - producer.update_high_watermark().await?; - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use common_wal::test_util::run_test_with_kafka_wal; - use store_api::storage::RegionId; - - use super::*; - use crate::kafka::test_util::{prepare, record}; - - #[tokio::test] - async fn test_try_update_high_watermark() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let (manager, topics) = - prepare("test_try_update_high_watermark", 1, broker_endpoints).await; - let manager = Arc::new(manager); - let high_watermark_manager = HighWatermarkManager::new( - Duration::from_millis(100), - manager.high_watermark().clone(), - manager.clone(), - ); - let high_watermark = high_watermark_manager.high_watermark.clone(); - high_watermark_manager.run().await; - - let topic = topics[0].clone(); - let provider = Arc::new(KafkaProvider::new(topic.to_string())); - let producer = manager - .get_or_insert(&provider) - .await - .unwrap() - .producer() - .clone(); - - tokio::time::sleep(Duration::from_millis(150)).await; - let current_high_watermark = *high_watermark.get(&provider).unwrap(); - assert_eq!(current_high_watermark, 0); - - let record = vec![record()]; - let region = RegionId::new(1, 1); - producer.produce(region, record.clone()).await.unwrap(); - tokio::time::sleep(Duration::from_millis(150)).await; - let current_high_watermark = *high_watermark.get(&provider).unwrap(); - assert_eq!(current_high_watermark, record.len() as u64); - }) - }) - .await - } -} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index d416599088..54d7d079a2 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -16,7 +16,10 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; +use common_meta::datanode::TopicStatsReporter; +use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS; use common_telemetry::{debug, warn}; +use common_time::util::current_time_millis; use common_wal::config::kafka::DatanodeKafkaConfig; use dashmap::DashMap; use futures::future::try_join_all; @@ -33,17 +36,33 @@ use store_api::storage::RegionId; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer}; -use crate::kafka::high_watermark_manager::HighWatermarkManager; use crate::kafka::index::{ build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE, }; +use crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher; use crate::kafka::producer::OrderedBatchProducerRef; use crate::kafka::util::record::{ convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE, }; use crate::metrics; -const DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60); + +/// Statistics for a topic. +#[derive(Debug, Clone, Copy, Default)] +pub struct TopicStat { + /// Latest offset for the topic. + /// + /// The latest offset is updated in two ways: + /// - Automatically when the producer successfully commits data to Kafka + /// - Periodically by the [PeriodicOffsetFetcher](crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher). + /// + pub latest_offset: u64, + /// Total size in bytes of records appended to the topic. + pub record_size: u64, + /// Total number of records appended to the topic. + pub record_num: u64, +} /// A log store backed by Kafka. #[derive(Debug)] @@ -56,18 +75,70 @@ pub struct KafkaLogStore { consumer_wait_timeout: Duration, /// Ignore missing entries during read WAL. overwrite_entry_start_id: bool, - /// High watermark for all topics. + /// The stats of each topic. /// - /// Represents the offset of the last record in each topic. This is used to track - /// the latest available data in Kafka topics. - /// - /// The high watermark is updated in two ways: - /// - Automatically when the producer successfully commits data to Kafka - /// - Periodically by the [HighWatermarkManager](crate::kafka::high_watermark_manager::HighWatermarkManager). - /// - /// This shared map allows multiple components to access the latest high watermark + /// This shared map allows multiple components to access the latest stats /// information without needing to query Kafka directly. - high_watermark: Arc, u64>>, + topic_stats: Arc, TopicStat>>, +} + +struct PeriodicTopicStatsReporter { + topic_stats: Arc, TopicStat>>, + last_reported_timestamp_millis: i64, + report_interval_millis: i64, +} + +impl PeriodicTopicStatsReporter { + fn align_ts(ts: i64, report_interval_millis: i64) -> i64 { + (ts / report_interval_millis) * report_interval_millis + } + + /// Creates a new [PeriodicTopicStatsReporter]. + /// + /// # Panics + /// + /// Panics if `report_interval` is zero. + fn new( + topic_stats: Arc, TopicStat>>, + report_interval: Duration, + ) -> Self { + assert!(!report_interval.is_zero()); + let report_interval_millis = report_interval.as_millis() as i64; + let last_reported_timestamp_millis = + Self::align_ts(current_time_millis(), report_interval_millis); + + Self { + topic_stats, + last_reported_timestamp_millis, + report_interval_millis, + } + } +} + +impl TopicStatsReporter for PeriodicTopicStatsReporter { + fn reportable_topics(&mut self) -> Vec { + let now = Self::align_ts(current_time_millis(), self.report_interval_millis); + if now < self.last_reported_timestamp_millis + self.report_interval_millis { + debug!("Skip reporting topic stats because the interval is not reached"); + return vec![]; + } + + self.last_reported_timestamp_millis = now; + let mut reportable_topics = Vec::with_capacity(self.topic_stats.len()); + for e in self.topic_stats.iter() { + let topic_stat = e.value(); + let topic_stat = common_meta::datanode::TopicStat { + topic: e.key().topic.clone(), + latest_entry_id: topic_stat.latest_offset, + record_size: topic_stat.record_size, + record_num: topic_stat.record_num, + }; + debug!("Reportable topic: {:?}", topic_stat); + reportable_topics.push(topic_stat); + } + debug!("Reportable {} topics at {}", reportable_topics.len(), now); + reportable_topics + } } impl KafkaLogStore { @@ -76,25 +147,30 @@ impl KafkaLogStore { config: &DatanodeKafkaConfig, global_index_collector: Option, ) -> Result { - let high_watermark = Arc::new(DashMap::new()); + let topic_stats = Arc::new(DashMap::new()); let client_manager = Arc::new( - ClientManager::try_new(config, global_index_collector, high_watermark.clone()).await?, + ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?, ); - let high_watermark_manager = HighWatermarkManager::new( - DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL, - high_watermark.clone(), - client_manager.clone(), - ); - high_watermark_manager.run().await; + let fetcher = + PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone()); + fetcher.run().await; Ok(Self { client_manager, max_batch_bytes: config.max_batch_bytes.as_bytes() as usize, consumer_wait_timeout: config.consumer_wait_timeout, overwrite_entry_start_id: config.overwrite_entry_start_id, - high_watermark, + topic_stats, }) } + + /// Returns the topic stats. + pub fn topic_stats_reporter(&self) -> Box { + Box::new(PeriodicTopicStatsReporter::new( + self.topic_stats.clone(), + Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS), + )) + } } fn build_entry( @@ -226,13 +302,6 @@ impl LogStore for KafkaLogStore { )) .await?; - // Updates the high watermark offset of the last record in the topic. - for (region_id, offset) in ®ion_grouped_max_offset { - // Safety: `region_id` is always valid. - let provider = region_to_provider.get(region_id).unwrap(); - self.high_watermark.insert(provider.clone(), *offset); - } - Ok(AppendBatchResponse { last_entry_ids: region_grouped_max_offset.into_iter().collect(), }) @@ -418,7 +487,7 @@ impl LogStore for KafkaLogStore { } /// Returns the highest entry id of the specified topic in remote WAL. - fn high_watermark(&self, provider: &Provider) -> Result { + fn latest_entry_id(&self, provider: &Provider) -> Result { let provider = provider .as_kafka_provider() .with_context(|| InvalidProviderSnafu { @@ -426,14 +495,14 @@ impl LogStore for KafkaLogStore { actual: provider.type_name(), })?; - let high_watermark = self - .high_watermark + let stat = self + .topic_stats .get(provider) .as_deref() .copied() - .unwrap_or(0); + .unwrap_or_default(); - Ok(high_watermark) + Ok(stat.latest_offset) } /// Stops components of the logstore. @@ -458,12 +527,16 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; + use std::sync::Arc; + use std::time::Duration; use common_base::readable_size::ReadableSize; + use common_meta::datanode::TopicStatsReporter; use common_telemetry::info; use common_telemetry::tracing::warn; use common_wal::config::kafka::common::KafkaConnectionConfig; use common_wal::config::kafka::DatanodeKafkaConfig; + use dashmap::DashMap; use futures::TryStreamExt; use rand::prelude::SliceRandom; use rand::Rng; @@ -473,7 +546,7 @@ mod tests { use store_api::storage::RegionId; use super::build_entry; - use crate::kafka::log_store::KafkaLogStore; + use crate::kafka::log_store::{KafkaLogStore, PeriodicTopicStatsReporter, TopicStat}; #[test] fn test_build_naive_entry() { @@ -630,7 +703,7 @@ mod tests { .for_each(|entry| entry.set_entry_id(0)); assert_eq!(expected_entries, actual_entries); } - let high_wathermark = logstore.high_watermark(&provider).unwrap(); + let high_wathermark = logstore.latest_entry_id(&provider).unwrap(); assert_eq!(high_wathermark, 99); } @@ -706,7 +779,35 @@ mod tests { .for_each(|entry| entry.set_entry_id(0)); assert_eq!(expected_entries, actual_entries); } - let high_wathermark = logstore.high_watermark(&provider).unwrap(); + let high_wathermark = logstore.latest_entry_id(&provider).unwrap(); assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1); } + + #[tokio::test] + async fn test_topic_stats_reporter() { + common_telemetry::init_default_ut_logging(); + let topic_stats = Arc::new(DashMap::new()); + let provider = Provider::kafka_provider("my_topic".to_string()); + topic_stats.insert( + provider.as_kafka_provider().unwrap().clone(), + TopicStat { + latest_offset: 0, + record_size: 0, + record_num: 0, + }, + ); + let mut reporter = PeriodicTopicStatsReporter::new(topic_stats, Duration::from_secs(1)); + // The first reportable topics should be empty. + let reportable_topics = reporter.reportable_topics(); + assert_eq!(reportable_topics.len(), 0); + + // After 1 second, the reportable topics should be the topic in the topic_stats. + tokio::time::sleep(Duration::from_secs(1)).await; + let reportable_topics = reporter.reportable_topics(); + assert_eq!(reportable_topics.len(), 1); + + // Call it immediately, should be empty. + let reportable_topics = reporter.reportable_topics(); + assert_eq!(reportable_topics.len(), 0); + } } diff --git a/src/log-store/src/kafka/periodic_offset_fetcher.rs b/src/log-store/src/kafka/periodic_offset_fetcher.rs new file mode 100644 index 0000000000..92eb119aa3 --- /dev/null +++ b/src/log-store/src/kafka/periodic_offset_fetcher.rs @@ -0,0 +1,122 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_telemetry::{debug, error, info}; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::error::Result; +use crate::kafka::client_manager::ClientManagerRef; + +/// PeriodicOffsetFetcher is responsible for periodically updating the offset +/// for each Kafka topic. +pub(crate) struct PeriodicOffsetFetcher { + /// Interval to fetch the latest offset. + interval: Duration, + /// Client manager to send requests. + client_manager: ClientManagerRef, +} + +impl PeriodicOffsetFetcher { + pub(crate) fn new(interval: Duration, client_manager: ClientManagerRef) -> Self { + Self { + interval, + client_manager, + } + } + + /// Starts the offset fetcher as a background task + /// + /// This spawns a task that periodically queries Kafka for the latest + /// offset values for all registered topics and updates the shared map. + pub(crate) async fn run(self) { + common_runtime::spawn_global(async move { + info!("PeriodicOffsetFetcher started"); + let mut interval = interval(self.interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if let Err(e) = self.try_update().await { + error!(e; "Failed to update latest offset"); + } + } + }); + } + + /// Tries to refresh the latest offset for every registered topic. + /// + /// For each topic in the stats map, retrieves its producer and requests + /// an update of the latest offset from Kafka. + pub(crate) async fn try_update(&self) -> Result<()> { + let topics = self.client_manager.list_topics().await; + for topic in topics.iter() { + debug!("Fetching latest offset for topic: {}", topic.topic); + let producer = self + .client_manager + .get_or_insert(topic) + .await? + .producer() + .clone(); + producer.fetch_latest_offset().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; + use store_api::logstore::provider::KafkaProvider; + use store_api::storage::RegionId; + + use super::*; + use crate::kafka::test_util::{prepare, record}; + + #[tokio::test] + async fn test_try_update_latest_offset() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let broker_endpoints = get_kafka_endpoints(); + + let (manager, topics) = prepare("test_try_update_latest_offset", 1, broker_endpoints).await; + let manager = Arc::new(manager); + let fetcher = PeriodicOffsetFetcher::new(Duration::from_millis(100), manager.clone()); + let topic_stats = manager.topic_stats().clone(); + fetcher.run().await; + + let topic = topics[0].clone(); + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + let producer = manager + .get_or_insert(&provider) + .await + .unwrap() + .producer() + .clone(); + + tokio::time::sleep(Duration::from_millis(150)).await; + let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; + assert_eq!(current_latest_offset, 0); + + let record = vec![record()]; + let region = RegionId::new(1, 1); + producer.produce(region, record.clone()).await.unwrap(); + tokio::time::sleep(Duration::from_millis(150)).await; + let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; + assert_eq!(current_latest_offset, record.len() as u64); + } +} diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs index 6f89c75e7a..4d24d9a0d9 100644 --- a/src/log-store/src/kafka/producer.rs +++ b/src/log-store/src/kafka/producer.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_telemetry::warn; use dashmap::DashMap; use rskafka::client::partition::{Compression, OffsetAt, PartitionClient}; +use rskafka::client::producer::ProduceResult; use rskafka::record::Record; use store_api::logstore::provider::KafkaProvider; use store_api::storage::RegionId; @@ -24,6 +25,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::error::{self, Result}; use crate::kafka::index::IndexCollector; +use crate::kafka::log_store::TopicStat; use crate::kafka::worker::{BackgroundProducerWorker, ProduceResultHandle, WorkerRequest}; use crate::metrics::{ METRIC_KAFKA_CLIENT_BYTES_TOTAL, METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED, @@ -57,7 +59,7 @@ impl OrderedBatchProducer { compression: Compression, max_batch_bytes: usize, index_collector: Box, - high_watermark: Arc, u64>>, + topic_stats: Arc, TopicStat>>, ) -> Self { let mut worker = BackgroundProducerWorker { provider, @@ -67,7 +69,7 @@ impl OrderedBatchProducer { request_batch_size: REQUEST_BATCH_SIZE, max_batch_bytes, index_collector, - high_watermark, + topic_stats, }; tokio::spawn(async move { worker.run().await }); Self { sender: tx } @@ -94,12 +96,12 @@ impl OrderedBatchProducer { Ok(handle) } - /// Sends an [WorkerRequest::UpdateHighWatermark] request to the producer. - /// This is used to update the high watermark for the topic. - pub(crate) async fn update_high_watermark(&self) -> Result<()> { + /// Sends an [WorkerRequest::FetchLatestOffset] request to the producer. + /// This is used to fetch the latest offset for the topic. + pub(crate) async fn fetch_latest_offset(&self) -> Result<()> { if self .sender - .send(WorkerRequest::UpdateHighWatermark) + .send(WorkerRequest::FetchLatestOffset) .await .is_err() { @@ -116,7 +118,7 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync { &self, records: Vec, compression: Compression, - ) -> rskafka::client::error::Result>; + ) -> rskafka::client::error::Result; async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result; } @@ -127,20 +129,22 @@ impl ProducerClient for PartitionClient { &self, records: Vec, compression: Compression, - ) -> rskafka::client::error::Result> { - let total_size = records.iter().map(|r| r.approximate_size()).sum::(); + ) -> rskafka::client::error::Result { let partition = self.partition().to_string(); - METRIC_KAFKA_CLIENT_BYTES_TOTAL - .with_label_values(&[self.topic(), &partition]) - .inc_by(total_size as u64); - METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL - .with_label_values(&[self.topic(), &partition]) - .inc(); let _timer = METRIC_KAFKA_CLIENT_PRODUCE_ELAPSED .with_label_values(&[self.topic(), &partition]) .start_timer(); - self.produce(records, compression).await + let result = self.produce(records, compression).await?; + + METRIC_KAFKA_CLIENT_BYTES_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc_by(result.encoded_request_size as u64); + METRIC_KAFKA_CLIENT_TRAFFIC_TOTAL + .with_label_values(&[self.topic(), &partition]) + .inc(); + + Ok(result) } async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result { @@ -182,7 +186,7 @@ mod tests { &self, records: Vec, _compression: Compression, - ) -> rskafka::client::error::Result> { + ) -> rskafka::client::error::Result { tokio::time::sleep(self.delay).await; if let Some(e) = self.error { @@ -206,7 +210,10 @@ mod tests { .collect(); batch_sizes.push(records.len()); debug!("Return offsets: {offsets:?}"); - Ok(offsets) + Ok(ProduceResult { + offsets, + encoded_request_size: 0, + }) } async fn get_offset(&self, _at: OffsetAt) -> rskafka::client::error::Result { diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index 372d8c5567..de75114f48 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -13,9 +13,9 @@ // limitations under the License. pub(crate) mod dump_index; +pub(crate) mod fetch_latest_offset; pub(crate) mod flush; pub(crate) mod produce; -pub(crate) mod update_high_watermark; use std::sync::Arc; @@ -33,13 +33,14 @@ use tokio::sync::oneshot::{self}; use crate::error::{self, NoMaxValueSnafu, Result}; use crate::kafka::index::{IndexCollector, IndexEncoder}; +use crate::kafka::log_store::TopicStat; use crate::kafka::producer::ProducerClient; pub(crate) enum WorkerRequest { Produce(ProduceRequest), TruncateIndex(TruncateIndexRequest), DumpIndex(DumpIndexRequest), - UpdateHighWatermark, + FetchLatestOffset, } impl WorkerRequest { @@ -160,8 +161,8 @@ pub(crate) struct BackgroundProducerWorker { pub(crate) max_batch_bytes: usize, /// Collecting ids of WAL entries. pub(crate) index_collector: Box, - /// High watermark for all topics. - pub(crate) high_watermark: Arc, u64>>, + /// The stats of each topic. + pub(crate) topic_stats: Arc, TopicStat>>, } impl BackgroundProducerWorker { @@ -199,8 +200,8 @@ impl BackgroundProducerWorker { entry_id, }) => self.index_collector.truncate(region_id, entry_id), WorkerRequest::DumpIndex(req) => self.dump_index(req).await, - WorkerRequest::UpdateHighWatermark => { - self.update_high_watermark().await; + WorkerRequest::FetchLatestOffset => { + self.fetch_latest_offset().await; } } } diff --git a/src/log-store/src/kafka/worker/update_high_watermark.rs b/src/log-store/src/kafka/worker/fetch_latest_offset.rs similarity index 62% rename from src/log-store/src/kafka/worker/update_high_watermark.rs rename to src/log-store/src/kafka/worker/fetch_latest_offset.rs index 8404086418..6fda16d3d1 100644 --- a/src/log-store/src/kafka/worker/update_high_watermark.rs +++ b/src/log-store/src/kafka/worker/fetch_latest_offset.rs @@ -17,14 +17,15 @@ use rskafka::client::partition::OffsetAt; use snafu::ResultExt; use crate::error; +use crate::kafka::log_store::TopicStat; use crate::kafka::worker::BackgroundProducerWorker; impl BackgroundProducerWorker { - /// Updates the high watermark for the topic. + /// Fetches the latest offset for the topic. /// - /// This function retrieves the latest offset from Kafka and updates the high watermark + /// This function retrieves the topic's latest offset from Kafka and updates the latest offset /// in the shared map. - pub async fn update_high_watermark(&mut self) { + pub async fn fetch_latest_offset(&mut self) { match self .client .get_offset(OffsetAt::Latest) @@ -32,27 +33,32 @@ impl BackgroundProducerWorker { .context(error::GetOffsetSnafu { topic: &self.provider.topic, }) { - Ok(offset) => match self.high_watermark.entry(self.provider.clone()) { + Ok(offset) => match self.topic_stats.entry(self.provider.clone()) { dashmap::Entry::Occupied(mut occupied_entry) => { let offset = offset as u64; - if *occupied_entry.get() != offset { - occupied_entry.insert(offset); + let stat = occupied_entry.get_mut(); + if stat.latest_offset < offset { + stat.latest_offset = offset; debug!( - "Updated high watermark for topic {} to {}", + "Updated latest offset for topic {} to {}", self.provider.topic, offset ); } } dashmap::Entry::Vacant(vacant_entry) => { - vacant_entry.insert(offset as u64); + vacant_entry.insert(TopicStat { + latest_offset: offset as u64, + record_size: 0, + record_num: 0, + }); debug!( - "Inserted high watermark for topic {} to {}", + "Inserted latest offset for topic {} to {}", self.provider.topic, offset ); } }, Err(err) => { - error!(err; "Failed to get offset for topic {}", self.provider.topic); + error!(err; "Failed to get latest offset for topic {}", self.provider.topic); } } } diff --git a/src/log-store/src/kafka/worker/flush.rs b/src/log-store/src/kafka/worker/flush.rs index 427a528a14..7348c4ec6f 100644 --- a/src/log-store/src/kafka/worker/flush.rs +++ b/src/log-store/src/kafka/worker/flush.rs @@ -16,6 +16,7 @@ use common_telemetry::warn; use snafu::ResultExt; use crate::error; +use crate::kafka::log_store::TopicStat; use crate::kafka::worker::{BackgroundProducerWorker, PendingRequest}; impl BackgroundProducerWorker { @@ -28,6 +29,7 @@ impl BackgroundProducerWorker { size: _size, }: PendingRequest, ) { + let record_num = batch.len() as u64; let result = self .client .produce(batch, self.compression) @@ -35,12 +37,27 @@ impl BackgroundProducerWorker { .context(error::BatchProduceSnafu); if let Ok(result) = &result { - for (idx, region_id) in result.iter().zip(region_ids) { + let total_record_size = result.encoded_request_size as u64; + for (idx, region_id) in result.offsets.iter().zip(region_ids) { self.index_collector.append(region_id, *idx as u64); } + + let max_offset = result.offsets.iter().max().cloned().unwrap_or_default() as u64; + self.topic_stats + .entry(self.provider.clone()) + .and_modify(|stat| { + stat.latest_offset = stat.latest_offset.max(max_offset); + stat.record_size += total_record_size; + stat.record_num += record_num; + }) + .or_insert(TopicStat { + latest_offset: max_offset, + record_size: total_record_size, + record_num, + }); } - if let Err(err) = sender.send(result) { + if let Err(err) = sender.send(result.map(|r| r.offsets)) { warn!(err; "BatchFlushState Receiver is dropped"); } } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 673b2075c7..f4cd7c61cf 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -484,7 +484,7 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - fn high_watermark(&self, provider: &Provider) -> Result { + fn latest_entry_id(&self, provider: &Provider) -> Result { let ns = provider .as_raft_engine_provider() .with_context(|| InvalidProviderSnafu { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 0b6a28b12d..d391468750 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -882,6 +882,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to parse wal options"))] + ParseWalOptions { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to build kafka client."))] BuildKafkaClient { #[snafu(implicit)] @@ -1050,7 +1057,8 @@ impl ErrorExt for Error { | Error::RuntimeSwitchManager { source, .. } | Error::KvBackend { source, .. } | Error::UnexpectedLogicalRouteTable { source, .. } - | Error::UpdateTopicNameValue { source, .. } => source.status_code(), + | Error::UpdateTopicNameValue { source, .. } + | Error::ParseWalOptions { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 93d85ba186..b0f9fc902e 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -16,9 +16,12 @@ use std::any::Any; use std::time::Duration; use api::v1::meta::MailboxMessage; +use common_meta::ddl::utils::parse_region_wal_options; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_meta::lock_key::RemoteWalLock; use common_procedure::{Context as ProcedureContext, Status}; -use common_telemetry::error; +use common_telemetry::{error, warn}; +use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::time::{sleep, Instant}; @@ -56,10 +59,23 @@ impl State for UpgradeCandidateRegion { async fn next( &mut self, ctx: &mut Context, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let now = Instant::now(); - if self.upgrade_region_with_retry(ctx).await { + + let region_wal_option = self.get_region_wal_option(ctx).await?; + let region_id = ctx.persistent_ctx.region_id; + if region_wal_option.is_none() { + warn!( + "Region {} wal options not found, during upgrade candidate region", + region_id + ); + } + + if self + .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref()) + .await + { ctx.update_upgrade_candidate_region_elapsed(now); Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false))) } else { @@ -74,6 +90,26 @@ impl State for UpgradeCandidateRegion { } impl UpgradeCandidateRegion { + async fn get_region_wal_option(&self, ctx: &mut Context) -> Result> { + let region_id = ctx.persistent_ctx.region_id; + match ctx.get_from_peer_datanode_table_value().await { + Ok(datanode_table_value) => { + let region_wal_options = + parse_region_wal_options(&datanode_table_value.region_info.region_wal_options) + .context(error::ParseWalOptionsSnafu)?; + Ok(region_wal_options.get(®ion_id.region_number()).cloned()) + } + Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => { + warn!( + "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}", + region_id, datanode_id + ); + Ok(None) + } + Err(e) => Err(e), + } + } + /// Builds upgrade region instruction. fn build_upgrade_region_instruction( &self, @@ -196,12 +232,30 @@ impl UpgradeCandidateRegion { /// Upgrades a candidate region. /// /// Returns true if the candidate region is upgraded successfully. - async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool { + async fn upgrade_region_with_retry( + &self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + wal_options: Option<&WalOptions>, + ) -> bool { let mut retry = 0; let mut upgraded = false; loop { let timer = Instant::now(); + // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade. + let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options { + Some( + procedure_ctx + .provider + .acquire_lock( + &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()), + ) + .await, + ) + } else { + None + }; if let Err(err) = self.upgrade_region(ctx).await { retry += 1; ctx.update_operations_elapsed(timer); diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index e608300328..ff16f3db06 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -211,6 +211,10 @@ impl WalPruneProcedure { .into_iter() .map(|(region_id, region)| { let prunable_entry_id = region.manifest.prunable_entry_id(); + info!( + "Region {}, topic: {}, prunable_entry_id: {}", + region_id, self.data.topic, prunable_entry_id + ); (region_id, prunable_entry_id) }) .collect(); @@ -246,6 +250,13 @@ impl WalPruneProcedure { self.data.regions_to_flush.push(region_id); } } + info!( + "Flush regions: {:?}, trigger_flush_threshold: {}, prunable_entry_id: {}, max_prunable_entry_id: {}", + self.data.regions_to_flush, + self.data.trigger_flush_threshold, + self.data.prunable_entry_id, + max_prunable_entry_id + ); self.data.state = WalPruneState::FlushRegion; } else { self.data.state = WalPruneState::Prune; @@ -420,7 +431,8 @@ mod tests { use std::assert_matches::assert_matches; use api::v1::meta::HeartbeatResponse; - use common_wal::test_util::run_test_with_kafka_wal; + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; use rskafka::record::Record; use tokio::sync::mpsc::Receiver; @@ -490,7 +502,8 @@ mod tests { rskafka::client::partition::Compression::NoCompression, ) .await - .unwrap(); + .unwrap() + .offsets; offsets.extend(offset); } offsets @@ -550,103 +563,101 @@ mod tests { #[tokio::test] async fn test_procedure_execution() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - common_telemetry::init_default_ut_logging(); - let mut topic_name = uuid::Uuid::new_v4().to_string(); - // Topic should start with a letter. - topic_name = format!("test_procedure_execution-{}", topic_name); - let mut env = TestEnv::new(); - let context = env.build_wal_prune_context(broker_endpoints).await; - TestEnv::prepare_topic(&context.client, &topic_name).await; - let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None); + maybe_skip_kafka_integration_test!(); + let broker_endpoints = get_kafka_endpoints(); - // Before any data in kvbackend is mocked, should return a retryable error. - let result = procedure.on_prune().await; - assert_matches!(result, Err(e) if e.is_retryable()); + common_telemetry::init_default_ut_logging(); + let mut topic_name = uuid::Uuid::new_v4().to_string(); + // Topic should start with a letter. + topic_name = format!("test_procedure_execution-{}", topic_name); + let mut env = TestEnv::new(); + let context = env.build_wal_prune_context(broker_endpoints).await; + TestEnv::prepare_topic(&context.client, &topic_name).await; + let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None); - let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await; + // Before any data in kvbackend is mocked, should return a retryable error. + let result = procedure.on_prune().await; + assert_matches!(result, Err(e) if e.is_retryable()); - // Step 1: Test `on_prepare`. - let status = procedure.on_prepare().await.unwrap(); - assert_matches!( - status, - Status::Executing { - persist: true, - clean_poisons: false - } - ); - assert_matches!(procedure.data.state, WalPruneState::FlushRegion); - assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id); - assert_eq!( - procedure.data.regions_to_flush.len(), - regions_to_flush.len() - ); - for region_id in ®ions_to_flush { - assert!(procedure.data.regions_to_flush.contains(region_id)); - } + let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await; - // Step 2: Test `on_sending_flush_request`. - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - env.mailbox - .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) - .await; - let status = procedure.on_sending_flush_request().await.unwrap(); - check_flush_request(&mut rx, ®ions_to_flush).await; - assert_matches!( - status, - Status::Executing { - persist: true, - clean_poisons: false - } - ); - assert_matches!(procedure.data.state, WalPruneState::Prune); + // Step 1: Test `on_prepare`. + let status = procedure.on_prepare().await.unwrap(); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); + assert_matches!(procedure.data.state, WalPruneState::FlushRegion); + assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id); + assert_eq!( + procedure.data.regions_to_flush.len(), + regions_to_flush.len() + ); + for region_id in ®ions_to_flush { + assert!(procedure.data.regions_to_flush.contains(region_id)); + } - // Step 3: Test `on_prune`. - let status = procedure.on_prune().await.unwrap(); - assert_matches!(status, Status::Done { output: None }); - // Check if the entry ids after(include) `prunable_entry_id` still exist. - check_entry_id_existence( - procedure.context.client.clone(), - &topic_name, - procedure.data.prunable_entry_id as i64, - true, - ) - .await; - // Check if the entry ids before `prunable_entry_id` are deleted. - check_entry_id_existence( - procedure.context.client.clone(), - &topic_name, - procedure.data.prunable_entry_id as i64 - 1, - false, - ) - .await; + // Step 2: Test `on_sending_flush_request`. + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + env.mailbox + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + let status = procedure.on_sending_flush_request().await.unwrap(); + check_flush_request(&mut rx, ®ions_to_flush).await; + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); + assert_matches!(procedure.data.state, WalPruneState::Prune); - let value = env - .table_metadata_manager - .topic_name_manager() - .get(&topic_name) - .await - .unwrap() - .unwrap(); - assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); - - // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails. - // Should log a warning and return `Status::Done`. - procedure.context.leader_region_registry.reset(); - let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Done { output: None }); - - // Step 5: Test `on_prepare`, don't flush regions. - procedure.data.trigger_flush_threshold = 0; - procedure.on_prepare().await.unwrap(); - assert_matches!(procedure.data.state, WalPruneState::Prune); - assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); - - // Clean up the topic. - delete_topic(procedure.context.client, &topic_name).await; - }) - }) + // Step 3: Test `on_prune`. + let status = procedure.on_prune().await.unwrap(); + assert_matches!(status, Status::Done { output: None }); + // Check if the entry ids after(include) `prunable_entry_id` still exist. + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.prunable_entry_id as i64, + true, + ) .await; + // Check if the entry ids before `prunable_entry_id` are deleted. + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.prunable_entry_id as i64 - 1, + false, + ) + .await; + + let value = env + .table_metadata_manager + .topic_name_manager() + .get(&topic_name) + .await + .unwrap() + .unwrap(); + assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); + + // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails. + // Should log a warning and return `Status::Done`. + procedure.context.leader_region_registry.reset(); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Done { output: None }); + + // Step 5: Test `on_prepare`, don't flush regions. + procedure.data.trigger_flush_threshold = 0; + procedure.on_prepare().await.unwrap(); + assert_matches!(procedure.data.state, WalPruneState::Prune); + assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); + + // Clean up the topic. + delete_topic(procedure.context.client, &topic_name).await; } } diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs index 370bcef06a..e2f386b0b6 100644 --- a/src/meta-srv/src/procedure/wal_prune/manager.rs +++ b/src/meta-srv/src/procedure/wal_prune/manager.rs @@ -334,7 +334,8 @@ mod test { use std::assert_matches::assert_matches; use common_meta::key::topic_name::TopicNameKey; - use common_wal::test_util::run_test_with_kafka_wal; + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; use tokio::time::{sleep, timeout}; use super::*; @@ -413,27 +414,24 @@ mod test { #[tokio::test] async fn test_wal_prune_manager() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let limit = 6; - let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await; - let topics = (0..limit * 2) - .map(|_| uuid::Uuid::new_v4().to_string()) - .collect::>(); - mock_topics(&manager, &topics).await; + maybe_skip_kafka_integration_test!(); + let broker_endpoints = get_kafka_endpoints(); + let limit = 6; + let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await; + let topics = (0..limit * 2) + .map(|_| uuid::Uuid::new_v4().to_string()) + .collect::>(); + mock_topics(&manager, &topics).await; - let tracker = manager.tracker.clone(); - let handler = - common_runtime::spawn_global(async move { manager.try_start().await.unwrap() }); - handler.await.unwrap(); + let tracker = manager.tracker.clone(); + let handler = + common_runtime::spawn_global(async move { manager.try_start().await.unwrap() }); + handler.await.unwrap(); - tx.send(Event::Tick).await.unwrap(); - // Wait for at least one procedure to be submitted. - timeout(Duration::from_millis(100), async move { tracker.len() > 0 }) - .await - .unwrap(); - }) - }) - .await; + tx.send(Event::Tick).await.unwrap(); + // Wait for at least one procedure to be submitted. + timeout(Duration::from_millis(100), async move { tracker.len() > 0 }) + .await + .unwrap(); } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index af92b8aa27..631f44caf1 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -124,7 +124,7 @@ pub struct MitoRegion { /// The topic's latest entry id since the region's last flushing. /// **Only used for remote WAL pruning.** /// - /// The value will be updated to the high watermark of the topic + /// The value will be updated to the latest offset of the topic /// if region receives a flush request or schedules a periodic flush task /// and the region's memtable is empty. /// diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 036fbc139e..7939ababa6 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -197,7 +197,7 @@ mod tests { unreachable!() } - fn high_watermark(&self, _provider: &Provider) -> Result { + fn latest_entry_id(&self, _provider: &Provider) -> Result { unreachable!() } } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 310d18f6dd..44c5f811ef 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -102,22 +102,22 @@ impl RegionWorkerLoop { let version = region.version_control.current(); let mut flushed_entry_id = version.last_entry_id; - let high_watermark = self + let latest_entry_id = self .wal .store() - .high_watermark(®ion.provider) + .latest_entry_id(®ion.provider) .unwrap_or_default(); warn!( - "Skips to replay memtable for region: {}, flushed entry id: {}, high watermark: {}", - region.region_id, flushed_entry_id, high_watermark + "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}", + region.region_id, flushed_entry_id, latest_entry_id ); - if high_watermark > flushed_entry_id { + if latest_entry_id > flushed_entry_id { warn!( - "Found high watermark is greater than flushed entry id, using high watermark as flushed entry id, region: {}, high watermark: {}, flushed entry id: {}", - region_id, high_watermark, flushed_entry_id + "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}", + region_id, latest_entry_id, flushed_entry_id ); - flushed_entry_id = high_watermark; + flushed_entry_id = latest_entry_id; region.version_control.set_entry_id(flushed_entry_id); } let on_region_opened = self.wal.on_region_opened(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 7d471c6517..7f7d5e3e15 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -17,7 +17,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; -use common_telemetry::{error, info}; +use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; @@ -259,20 +259,20 @@ impl RegionWorkerLoop { /// **This is only used for remote WAL pruning.** pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) { if region.provider.is_remote_wal() && region.version().memtables.is_empty() { - let high_watermark = self + let latest_offset = self .wal .store() - .high_watermark(®ion.provider) + .latest_entry_id(®ion.provider) .unwrap_or(0); let topic_last_entry_id = region.topic_latest_entry_id.load(Ordering::Relaxed); - if high_watermark != 0 && high_watermark > topic_last_entry_id { + if latest_offset > topic_last_entry_id { region .topic_latest_entry_id - .store(high_watermark, Ordering::Relaxed); - info!( - "Region {} high watermark updated to {}", - region.region_id, high_watermark + .store(latest_offset, Ordering::Relaxed); + debug!( + "Region {} latest entry id updated to {}", + region.region_id, latest_offset ); } } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index ccfc272618..19bbe195f0 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -95,8 +95,8 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { provider: &Provider, ) -> Result; - /// Returns the highest existing entry id in the log store. - fn high_watermark(&self, provider: &Provider) -> Result; + /// Returns the latest entry id in the log store. + fn latest_entry_id(&self, provider: &Provider) -> Result; } /// The response of an `append` operation. diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index a0d9459286..cbd2c58728 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -89,6 +89,7 @@ uuid.workspace = true zstd.workspace = true [dev-dependencies] +common-wal = { workspace = true, features = ["testing"] } datafusion.workspace = true datafusion-expr.workspace = true hex.workspace = true diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index be8dcda090..7f61fce7e6 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -26,14 +26,12 @@ use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_runtime::runtime::BuilderBuild; use common_runtime::{Builder as RuntimeBuilder, Runtime}; -use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, StorageConfig}; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; -use futures::future::BoxFuture; use object_store::config::{ AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, }; @@ -732,22 +730,3 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .await .unwrap(); } - -pub async fn run_test_with_kafka_wal(test: F) -where - F: FnOnce(Vec) -> BoxFuture<'static, ()>, -{ - let _ = dotenv::dotenv(); - let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); - if endpoints.is_empty() { - warn!("The endpoints is empty, skipping the test"); - return; - } - - let endpoints = endpoints - .split(',') - .map(|s| s.trim().to_string()) - .collect::>(); - - test(endpoints).await -} diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 482834fdcd..eb2814e732 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -77,10 +77,9 @@ macro_rules! region_migration_test { let store_type = tests_integration::test_util::StorageType::$service; if store_type.test_on() { common_telemetry::init_default_ut_logging(); - tests_integration::test_util::run_test_with_kafka_wal(|endpoints| { - Box::pin(async move { $crate::region_migration::$test(store_type, endpoints).await }) - }) - .await + common_wal::maybe_skip_kafka_integration_test!(); + let endpoints = common_wal::test_util::get_kafka_endpoints(); + $crate::region_migration::$test(store_type, endpoints).await } }