From 41814bb49f4e568dc6ac1acb2a00a8a967dca033 Mon Sep 17 00:00:00 2001 From: Yuhan Wang Date: Fri, 18 Apr 2025 20:10:47 +0800 Subject: [PATCH] feat: introduce `high_watermark` for remote wal logstore (#5877) * feat: introduce high_watermark_since_flush * test: add unit test for high watermark * refactor: submit a request instead * fix: send reply before submit request * fix: no need to update twice * feat: update high watermark in background periodically * test: update unit tests * fix: update high watermark periodically * test: update unit tests * chore: apply review comments * chore: rename * chore: apply review comments * chore: clean up * chore: apply review comments --- Cargo.lock | 1 + src/common/meta/src/region_registry.rs | 1 - src/log-store/Cargo.toml | 1 + src/log-store/src/kafka.rs | 3 + src/log-store/src/kafka/client_manager.rs | 66 ++------ .../src/kafka/high_watermark_manager.rs | 131 +++++++++++++++ src/log-store/src/kafka/log_store.rs | 61 ++++++- src/log-store/src/kafka/producer.rs | 32 ++-- src/log-store/src/kafka/test_util.rs | 88 ++++++++++ src/log-store/src/kafka/worker.rs | 8 + .../src/kafka/worker/update_high_watermark.rs | 59 +++++++ src/log-store/src/raft_engine/log_store.rs | 12 ++ src/metric-engine/src/utils.rs | 2 + src/mito2/src/engine/flush_test.rs | 70 +++++++- src/mito2/src/region.rs | 16 +- src/mito2/src/region/opener.rs | 2 + src/mito2/src/wal/raw_entry_reader.rs | 4 + src/mito2/src/worker/handle_flush.rs | 150 +++++++++++------- src/store-api/src/logstore.rs | 3 + src/store-api/src/region_engine.rs | 7 + 20 files changed, 589 insertions(+), 128 deletions(-) create mode 100644 src/log-store/src/kafka/high_watermark_manager.rs create mode 100644 src/log-store/src/kafka/test_util.rs create mode 100644 src/log-store/src/kafka/worker/update_high_watermark.rs diff --git a/Cargo.lock b/Cargo.lock index 984978a1c4..520b33c929 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6416,6 +6416,7 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "dashmap", "delta-encoding", "derive_builder 0.20.1", "futures", diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 76fb271f52..344e0fef5b 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -97,7 +97,6 @@ impl LeaderRegionManifestInfo { } /// Returns the minimum flushed entry id of the leader region. - /// It is used to determine the minimum flushed entry id that can be pruned in remote wal. pub fn min_flushed_entry_id(&self) -> u64 { match self { LeaderRegionManifestInfo::Mito { diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 31571afe3e..354dee6102 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -26,6 +26,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +dashmap.workspace = true delta-encoding = "0.4" derive_builder.workspace = true futures.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 1d765cf90b..452c88164a 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,9 +14,12 @@ pub(crate) mod client_manager; pub(crate) mod consumer; +mod high_watermark_manager; pub(crate) mod index; pub mod log_store; pub(crate) mod producer; +#[cfg(test)] +pub(crate) mod test_util; pub(crate) mod util; pub(crate) mod worker; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 9317cc938b..65599cb09e 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; use common_wal::config::kafka::DatanodeKafkaConfig; +use dashmap::DashMap; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; use rskafka::client::ClientBuilder; use snafu::ResultExt; @@ -64,6 +65,9 @@ pub(crate) struct ClientManager { flush_batch_size: usize, compression: Compression, + + /// High watermark for each topic. + high_watermark: Arc, u64>>, } impl ClientManager { @@ -71,6 +75,7 @@ impl ClientManager { pub(crate) async fn try_new( config: &DatanodeKafkaConfig, global_index_collector: Option, + high_watermark: Arc, u64>>, ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) @@ -96,6 +101,7 @@ impl ClientManager { flush_batch_size: config.max_batch_bytes.as_bytes() as usize, compression: Compression::Lz4, global_index_collector, + high_watermark, }) } @@ -111,6 +117,7 @@ impl ClientManager { .write() .await .insert(provider.clone(), client.clone()); + self.high_watermark.insert(provider.clone(), 0); Ok(client) } } @@ -159,6 +166,7 @@ impl ClientManager { self.compression, self.flush_batch_size, index_collector, + self.high_watermark.clone(), )); Ok(Client { client, producer }) @@ -167,66 +175,20 @@ impl ClientManager { pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> { self.global_index_collector.as_ref() } + + #[cfg(test)] + pub(crate) fn high_watermark(&self) -> &Arc, u64>> { + &self.high_watermark + } } #[cfg(test)] mod tests { - use common_wal::config::kafka::common::KafkaConnectionConfig; use common_wal::test_util::run_test_with_kafka_wal; use tokio::sync::Barrier; use super::*; - - /// Creates `num_topics` number of topics each will be decorated by the given decorator. - pub async fn create_topics( - num_topics: usize, - decorator: F, - broker_endpoints: &[String], - ) -> Vec - where - F: Fn(usize) -> String, - { - assert!(!broker_endpoints.is_empty()); - let client = ClientBuilder::new(broker_endpoints.to_vec()) - .build() - .await - .unwrap(); - let ctrl_client = client.controller_client().unwrap(); - let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) - .map(|i| { - let topic = decorator(i); - let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); - (topic, task) - }) - .unzip(); - futures::future::try_join_all(tasks).await.unwrap(); - topics - } - - /// Prepares for a test in that a collection of topics and a client manager are created. - async fn prepare( - test_name: &str, - num_topics: usize, - broker_endpoints: Vec, - ) -> (ClientManager, Vec) { - let topics = create_topics( - num_topics, - |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), - &broker_endpoints, - ) - .await; - - let config = DatanodeKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - ..Default::default() - }; - let manager = ClientManager::try_new(&config, None).await.unwrap(); - - (manager, topics) - } + use crate::kafka::test_util::prepare; /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. #[tokio::test] diff --git a/src/log-store/src/kafka/high_watermark_manager.rs b/src/log-store/src/kafka/high_watermark_manager.rs new file mode 100644 index 0000000000..8a4c2a1252 --- /dev/null +++ b/src/log-store/src/kafka/high_watermark_manager.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_telemetry::error; +use dashmap::DashMap; +use store_api::logstore::provider::KafkaProvider; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::error::Result; +use crate::kafka::client_manager::ClientManagerRef; + +/// HighWatermarkManager is responsible for periodically updating the high watermark +/// (latest existing record offset) for each Kafka topic. +pub(crate) struct HighWatermarkManager { + /// Interval to update high watermark. + update_interval: Duration, + /// The high watermark for each topic. + high_watermark: Arc, u64>>, + /// Client manager to send requests. + client_manager: ClientManagerRef, +} + +impl HighWatermarkManager { + pub(crate) fn new( + update_interval: Duration, + high_watermark: Arc, u64>>, + client_manager: ClientManagerRef, + ) -> Self { + Self { + update_interval, + high_watermark, + client_manager, + } + } + + /// Starts the high watermark manager as a background task + /// + /// This spawns a task that periodically queries Kafka for the latest + /// high watermark values for all registered topics and updates the shared map. + pub(crate) async fn run(self) { + common_runtime::spawn_global(async move { + let mut interval = interval(self.update_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if let Err(e) = self.try_update().await { + error!(e; "Failed to update high watermark"); + } + } + }); + } + + /// Attempts to update the high watermark for all registered topics + /// + /// Iterates through all topics in the high watermark map, obtains a producer + /// for each topic, and requests an update of the high watermark value. + pub(crate) async fn try_update(&self) -> Result<()> { + for iterator_element in self.high_watermark.iter() { + let producer = self + .client_manager + .get_or_insert(iterator_element.key()) + .await? + .producer() + .clone(); + producer.update_high_watermark().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use common_wal::test_util::run_test_with_kafka_wal; + use store_api::storage::RegionId; + + use super::*; + use crate::kafka::test_util::{prepare, record}; + + #[tokio::test] + async fn test_try_update_high_watermark() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let (manager, topics) = + prepare("test_try_update_high_watermark", 1, broker_endpoints).await; + let manager = Arc::new(manager); + let high_watermark_manager = HighWatermarkManager::new( + Duration::from_millis(100), + manager.high_watermark().clone(), + manager.clone(), + ); + let high_watermark = high_watermark_manager.high_watermark.clone(); + high_watermark_manager.run().await; + + let topic = topics[0].clone(); + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + let producer = manager + .get_or_insert(&provider) + .await + .unwrap() + .producer() + .clone(); + + tokio::time::sleep(Duration::from_millis(150)).await; + let current_high_watermark = *high_watermark.get(&provider).unwrap(); + assert_eq!(current_high_watermark, 0); + + let record = vec![record()]; + let region = RegionId::new(1, 1); + producer.produce(region, record.clone()).await.unwrap(); + tokio::time::sleep(Duration::from_millis(150)).await; + let current_high_watermark = *high_watermark.get(&provider).unwrap(); + assert_eq!(current_high_watermark, record.len() as u64); + }) + }) + .await + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 3cc728a998..66ad613bbc 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -18,6 +18,7 @@ use std::time::Duration; use common_telemetry::{debug, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; +use dashmap::DashMap; use futures::future::try_join_all; use futures_util::StreamExt; use rskafka::client::partition::OffsetAt; @@ -32,6 +33,7 @@ use store_api::storage::RegionId; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer}; +use crate::kafka::high_watermark_manager::HighWatermarkManager; use crate::kafka::index::{ build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE, }; @@ -41,6 +43,8 @@ use crate::kafka::util::record::{ }; use crate::metrics; +const DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL: Duration = Duration::from_secs(60); + /// A log store backed by Kafka. #[derive(Debug)] pub struct KafkaLogStore { @@ -52,6 +56,18 @@ pub struct KafkaLogStore { consumer_wait_timeout: Duration, /// Ignore missing entries during read WAL. overwrite_entry_start_id: bool, + /// High watermark for all topics. + /// + /// Represents the offset of the last record in each topic. This is used to track + /// the latest available data in Kafka topics. + /// + /// The high watermark is updated in two ways: + /// - Automatically when the producer successfully commits data to Kafka + /// - Periodically by the [HighWatermarkManager](crate::kafka::high_watermark_manager::HighWatermarkManager). + /// + /// This shared map allows multiple components to access the latest high watermark + /// information without needing to query Kafka directly. + high_watermark: Arc, u64>>, } impl KafkaLogStore { @@ -60,14 +76,23 @@ impl KafkaLogStore { config: &DatanodeKafkaConfig, global_index_collector: Option, ) -> Result { - let client_manager = - Arc::new(ClientManager::try_new(config, global_index_collector).await?); + let high_watermark = Arc::new(DashMap::new()); + let client_manager = Arc::new( + ClientManager::try_new(config, global_index_collector, high_watermark.clone()).await?, + ); + let high_watermark_manager = HighWatermarkManager::new( + DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL, + high_watermark.clone(), + client_manager.clone(), + ); + high_watermark_manager.run().await; Ok(Self { client_manager, max_batch_bytes: config.max_batch_bytes.as_bytes() as usize, consumer_wait_timeout: config.consumer_wait_timeout, overwrite_entry_start_id: config.overwrite_entry_start_id, + high_watermark, }) } } @@ -158,6 +183,7 @@ impl LogStore for KafkaLogStore { .collect::>(); let mut region_grouped_records: HashMap)> = HashMap::with_capacity(region_ids.len()); + let mut region_to_provider = HashMap::with_capacity(region_ids.len()); for entry in entries { let provider = entry.provider().as_kafka_provider().with_context(|| { error::InvalidProviderSnafu { @@ -165,6 +191,7 @@ impl LogStore for KafkaLogStore { actual: entry.provider().type_name(), } })?; + region_to_provider.insert(entry.region_id(), provider.clone()); let region_id = entry.region_id(); match region_grouped_records.entry(region_id) { std::collections::hash_map::Entry::Occupied(mut slot) => { @@ -199,6 +226,13 @@ impl LogStore for KafkaLogStore { )) .await?; + // Updates the high watermark offset of the last record in the topic. + for (region_id, offset) in ®ion_grouped_max_offset { + // Safety: `region_id` is always valid. + let provider = region_to_provider.get(region_id).unwrap(); + self.high_watermark.insert(provider.clone(), *offset); + } + Ok(AppendBatchResponse { last_entry_ids: region_grouped_max_offset.into_iter().collect(), }) @@ -383,6 +417,25 @@ impl LogStore for KafkaLogStore { Ok(()) } + /// Returns the highest entry id of the specified topic in remote WAL. + fn high_watermark(&self, provider: &Provider) -> Result { + let provider = provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + + let high_watermark = self + .high_watermark + .get(provider) + .as_deref() + .copied() + .unwrap_or(0); + + Ok(high_watermark) + } + /// Stops components of the logstore. async fn stop(&self) -> Result<()> { Ok(()) @@ -567,6 +620,8 @@ mod tests { .for_each(|entry| entry.set_entry_id(0)); assert_eq!(expected_entries, actual_entries); } + let high_wathermark = logstore.high_watermark(&provider).unwrap(); + assert_eq!(high_wathermark, 99); } #[tokio::test] @@ -640,5 +695,7 @@ mod tests { .for_each(|entry| entry.set_entry_id(0)); assert_eq!(expected_entries, actual_entries); } + let high_wathermark = logstore.high_watermark(&provider).unwrap(); + assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1); } } diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs index 910465ba61..6f89c75e7a 100644 --- a/src/log-store/src/kafka/producer.rs +++ b/src/log-store/src/kafka/producer.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_telemetry::warn; +use dashmap::DashMap; use rskafka::client::partition::{Compression, OffsetAt, PartitionClient}; use rskafka::record::Record; use store_api::logstore::provider::KafkaProvider; @@ -56,6 +57,7 @@ impl OrderedBatchProducer { compression: Compression, max_batch_bytes: usize, index_collector: Box, + high_watermark: Arc, u64>>, ) -> Self { let mut worker = BackgroundProducerWorker { provider, @@ -65,6 +67,7 @@ impl OrderedBatchProducer { request_batch_size: REQUEST_BATCH_SIZE, max_batch_bytes, index_collector, + high_watermark, }; tokio::spawn(async move { worker.run().await }); Self { sender: tx } @@ -90,6 +93,21 @@ impl OrderedBatchProducer { Ok(handle) } + + /// Sends an [WorkerRequest::UpdateHighWatermark] request to the producer. + /// This is used to update the high watermark for the topic. + pub(crate) async fn update_high_watermark(&self) -> Result<()> { + if self + .sender + .send(WorkerRequest::UpdateHighWatermark) + .await + .is_err() + { + warn!("OrderedBatchProducer is already exited"); + return error::OrderedBatchProducerStoppedSnafu {}.fail(); + } + Ok(()) + } } #[async_trait::async_trait] @@ -135,7 +153,6 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; - use chrono::{TimeZone, Utc}; use common_base::readable_size::ReadableSize; use common_telemetry::debug; use futures::stream::FuturesUnordered; @@ -149,6 +166,7 @@ mod tests { use super::*; use crate::kafka::index::NoopCollector; use crate::kafka::producer::OrderedBatchProducer; + use crate::kafka::test_util::record; #[derive(Debug)] struct MockClient { @@ -196,15 +214,6 @@ mod tests { } } - fn record() -> Record { - Record { - key: Some(vec![0; 4]), - value: Some(vec![0; 6]), - headers: Default::default(), - timestamp: Utc.timestamp_millis_opt(320).unwrap(), - } - } - #[tokio::test] async fn test_producer() { common_telemetry::init_default_ut_logging(); @@ -224,6 +233,7 @@ mod tests { Compression::NoCompression, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, Box::new(NoopCollector), + Arc::new(DashMap::new()), ); let region_id = RegionId::new(1, 1); @@ -272,6 +282,7 @@ mod tests { Compression::NoCompression, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, Box::new(NoopCollector), + Arc::new(DashMap::new()), ); let region_id = RegionId::new(1, 1); @@ -324,6 +335,7 @@ mod tests { Compression::NoCompression, ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, Box::new(NoopCollector), + Arc::new(DashMap::new()), ); let region_id = RegionId::new(1, 1); diff --git a/src/log-store/src/kafka/test_util.rs b/src/log-store/src/kafka/test_util.rs new file mode 100644 index 0000000000..c83e0ef00e --- /dev/null +++ b/src/log-store/src/kafka/test_util.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::{TimeZone, Utc}; +use common_wal::config::kafka::common::KafkaConnectionConfig; +use common_wal::config::kafka::DatanodeKafkaConfig; +use dashmap::DashMap; +use rskafka::client::ClientBuilder; +use rskafka::record::Record; + +use crate::kafka::client_manager::ClientManager; + +/// Creates `num_topics` number of topics each will be decorated by the given decorator. +pub(crate) async fn create_topics( + num_topics: usize, + decorator: F, + broker_endpoints: &[String], +) -> Vec +where + F: Fn(usize) -> String, +{ + assert!(!broker_endpoints.is_empty()); + let client = ClientBuilder::new(broker_endpoints.to_vec()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) + .map(|i| { + let topic = decorator(i); + let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); + (topic, task) + }) + .unzip(); + futures::future::try_join_all(tasks).await.unwrap(); + topics +} + +/// Prepares for a test in that a collection of topics and a client manager are created. +pub(crate) async fn prepare( + test_name: &str, + num_topics: usize, + broker_endpoints: Vec, +) -> (ClientManager, Vec) { + let topics = create_topics( + num_topics, + |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), + &broker_endpoints, + ) + .await; + + let config = DatanodeKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + ..Default::default() + }; + let high_watermark = Arc::new(DashMap::new()); + let manager = ClientManager::try_new(&config, None, high_watermark) + .await + .unwrap(); + + (manager, topics) +} + +/// Generate a record to produce. +pub fn record() -> Record { + Record { + key: Some(vec![0; 4]), + value: Some(vec![0; 6]), + headers: Default::default(), + timestamp: Utc.timestamp_millis_opt(320).unwrap(), + } +} diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index b05351d172..372d8c5567 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -15,10 +15,12 @@ pub(crate) mod dump_index; pub(crate) mod flush; pub(crate) mod produce; +pub(crate) mod update_high_watermark; use std::sync::Arc; use common_telemetry::debug; +use dashmap::DashMap; use futures::future::try_join_all; use rskafka::client::partition::Compression; use rskafka::record::Record; @@ -37,6 +39,7 @@ pub(crate) enum WorkerRequest { Produce(ProduceRequest), TruncateIndex(TruncateIndexRequest), DumpIndex(DumpIndexRequest), + UpdateHighWatermark, } impl WorkerRequest { @@ -157,6 +160,8 @@ pub(crate) struct BackgroundProducerWorker { pub(crate) max_batch_bytes: usize, /// Collecting ids of WAL entries. pub(crate) index_collector: Box, + /// High watermark for all topics. + pub(crate) high_watermark: Arc, u64>>, } impl BackgroundProducerWorker { @@ -194,6 +199,9 @@ impl BackgroundProducerWorker { entry_id, }) => self.index_collector.truncate(region_id, entry_id), WorkerRequest::DumpIndex(req) => self.dump_index(req).await, + WorkerRequest::UpdateHighWatermark => { + self.update_high_watermark().await; + } } } diff --git a/src/log-store/src/kafka/worker/update_high_watermark.rs b/src/log-store/src/kafka/worker/update_high_watermark.rs new file mode 100644 index 0000000000..8404086418 --- /dev/null +++ b/src/log-store/src/kafka/worker/update_high_watermark.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::{debug, error}; +use rskafka::client::partition::OffsetAt; +use snafu::ResultExt; + +use crate::error; +use crate::kafka::worker::BackgroundProducerWorker; + +impl BackgroundProducerWorker { + /// Updates the high watermark for the topic. + /// + /// This function retrieves the latest offset from Kafka and updates the high watermark + /// in the shared map. + pub async fn update_high_watermark(&mut self) { + match self + .client + .get_offset(OffsetAt::Latest) + .await + .context(error::GetOffsetSnafu { + topic: &self.provider.topic, + }) { + Ok(offset) => match self.high_watermark.entry(self.provider.clone()) { + dashmap::Entry::Occupied(mut occupied_entry) => { + let offset = offset as u64; + if *occupied_entry.get() != offset { + occupied_entry.insert(offset); + debug!( + "Updated high watermark for topic {} to {}", + self.provider.topic, offset + ); + } + } + dashmap::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(offset as u64); + debug!( + "Inserted high watermark for topic {} to {}", + self.provider.topic, offset + ); + } + }, + Err(err) => { + error!(err; "Failed to get offset for topic {}", self.provider.topic); + } + } + } +} diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index c7df8be66c..a060c9a976 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -483,6 +483,18 @@ impl LogStore for RaftEngineLogStore { ); Ok(()) } + + fn high_watermark(&self, provider: &Provider) -> Result { + let ns = provider + .as_raft_engine_provider() + .with_context(|| InvalidProviderSnafu { + expected: RaftEngineProvider::type_name(), + actual: provider.type_name(), + })?; + let namespace_id = ns.id; + let last_index = self.engine.last_index(namespace_id).unwrap_or(0); + Ok(last_index) + } } #[derive(Debug, Clone)] diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs index ad0695f54a..0f28a6365e 100644 --- a/src/metric-engine/src/utils.rs +++ b/src/metric-engine/src/utils.rs @@ -61,6 +61,8 @@ pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option { warn!( diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 25bba7e085..1d836da733 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -25,7 +25,7 @@ use common_wal::options::WAL_OPTIONS_KEY; use rstest::rstest; use rstest_reuse::{self, apply}; use store_api::region_engine::RegionEngine; -use store_api::region_request::RegionRequest; +use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -33,8 +33,8 @@ use crate::engine::listener::{FlushListener, StallListener}; use crate::test_util::{ build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows, - raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder, - LogStoreFactory, MockWriteBufferManager, TestEnv, + raft_engine_log_store_factory, reopen_region, rows_schema, single_kafka_log_store_factory, + CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, }; use crate::time_provider::TimeProvider; use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; @@ -544,3 +544,67 @@ async fn test_flush_workers() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[apply(single_kafka_log_store_factory)] +async fn test_update_topic_latest_entry_id(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(FlushListener::default()); + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let engine = env + .create_engine_with( + MitoConfig::default(), + Some(write_buffer_manager.clone()), + Some(listener.clone()), + ) + .await; + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 0); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows.clone()).await; + + let request = RegionFlushRequest::default(); + engine + .handle_request(region_id, RegionRequest::Flush(request.clone())) + .await + .unwrap(); + // Wait until flush is finished. + listener.wait().await; + assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 0); + + engine + .handle_request(region_id, RegionRequest::Flush(request.clone())) + .await + .unwrap(); + assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1); +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 191300a076..4ad3ea8e2d 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -119,6 +119,16 @@ pub(crate) struct MitoRegion { last_compaction_millis: AtomicI64, /// Provider to get current time. time_provider: TimeProviderRef, + /// The topic's latest entry id since the region's last flushing. + /// **Only used for remote WAL pruning.** + /// + /// The value will be updated to the high watermark of the topic + /// if region receives a flush request or schedules a periodic flush task + /// and the region's memtable is empty. + /// + /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region, + /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`. + pub(crate) topic_latest_entry_id: AtomicU64, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, /// manifest stats @@ -287,12 +297,14 @@ impl MitoRegion { let sst_usage = version.ssts.sst_usage(); let index_usage = version.ssts.index_usage(); + let flushed_entry_id = version.flushed_entry_id; let wal_usage = self.estimated_wal_usage(memtable_usage); let manifest_usage = self.stats.total_manifest_size(); let num_rows = version.ssts.num_rows() + version.memtables.num_rows(); let manifest_version = self.stats.manifest_version(); - let flushed_entry_id = version.flushed_entry_id; + + let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed); RegionStatistic { num_rows, @@ -305,6 +317,8 @@ impl MitoRegion { manifest_version, flushed_entry_id, }, + data_topic_latest_entry_id: topic_latest_entry_id, + metadata_topic_latest_entry_id: topic_latest_entry_id, } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index b7c4803b54..5a1bf86c18 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -274,6 +274,7 @@ impl RegionOpener { last_flush_millis: AtomicI64::new(now), last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), + topic_latest_entry_id: AtomicU64::new(0), memtable_builder, stats: self.stats, }) @@ -452,6 +453,7 @@ impl RegionOpener { last_flush_millis: AtomicI64::new(now), last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), + topic_latest_entry_id: AtomicU64::new(0), memtable_builder, stats: self.stats.clone(), }; diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 85a0c945b9..9a86c9ebfc 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -196,6 +196,10 @@ mod tests { ) -> Result { unreachable!() } + + fn high_watermark(&self, _provider: &Provider) -> Result { + unreachable!() + } } #[tokio::test] diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index e846809a5d..dc5a58d794 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,6 +14,7 @@ //! Handling flush related requests. +use std::sync::atomic::Ordering; use std::sync::Arc; use common_telemetry::{error, info}; @@ -29,34 +30,6 @@ use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - /// Handles manual flush request. - pub(crate) async fn handle_flush_request( - &mut self, - region_id: RegionId, - request: RegionFlushRequest, - mut sender: OptionOutputTx, - ) { - let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else { - return; - }; - - let reason = if region.is_downgrading() { - FlushReason::Downgrading - } else { - FlushReason::Manual - }; - - let mut task = - self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); - task.push_sender(sender); - if let Err(e) = - self.flush_scheduler - .schedule_flush(region.region_id, ®ion.version_control, task) - { - error!(e; "Failed to schedule flush task for region {}", region.region_id); - } - } - /// On region flush job failed. pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { self.flush_scheduler.on_flush_failed(region_id, request.err); @@ -129,37 +102,6 @@ impl RegionWorkerLoop { Ok(()) } - /// Flushes regions periodically. - pub(crate) fn flush_periodically(&mut self) -> Result<()> { - let regions = self.regions.list_regions(); - let now = self.time_provider.current_time_millis(); - let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; - - for region in ®ions { - if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() { - // Already flushing or not writable. - continue; - } - - if region.last_flush_millis() < min_last_flush_time { - // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. - let task = self.new_flush_task( - region, - FlushReason::Periodically, - None, - self.config.clone(), - ); - self.flush_scheduler.schedule_flush( - region.region_id, - ®ion.version_control, - task, - )?; - } - } - - Ok(()) - } - /// Creates a flush task with specific `reason` for the `region`. pub(crate) fn new_flush_task( &self, @@ -185,6 +127,75 @@ impl RegionWorkerLoop { } impl RegionWorkerLoop { + /// Handles manual flush request. + pub(crate) async fn handle_flush_request( + &mut self, + region_id: RegionId, + request: RegionFlushRequest, + mut sender: OptionOutputTx, + ) { + let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else { + return; + }; + // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty. + // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id` + // when handling flush request instead of in `schedule_flush` or `flush_finished`. + self.update_topic_latest_entry_id(®ion); + info!( + "Region {} flush request, high watermark: {}", + region_id, + region.topic_latest_entry_id.load(Ordering::Relaxed) + ); + + let reason = if region.is_downgrading() { + FlushReason::Downgrading + } else { + FlushReason::Manual + }; + + let mut task = + self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); + task.push_sender(sender); + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { + error!(e; "Failed to schedule flush task for region {}", region.region_id); + } + } + + /// Flushes regions periodically. + pub(crate) fn flush_periodically(&mut self) -> Result<()> { + let regions = self.regions.list_regions(); + let now = self.time_provider.current_time_millis(); + let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; + + for region in ®ions { + if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() { + // Already flushing or not writable. + continue; + } + self.update_topic_latest_entry_id(region); + + if region.last_flush_millis() < min_last_flush_time { + // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. + let task = self.new_flush_task( + region, + FlushReason::Periodically, + None, + self.config.clone(), + ); + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; + } + } + + Ok(()) + } + /// On region flush job finished. pub(crate) async fn handle_flush_finished( &mut self, @@ -247,4 +258,25 @@ impl RegionWorkerLoop { self.listener.on_flush_success(region_id); } + + /// Updates the latest entry id since flush of the region. + /// **This is only used for remote WAL pruning.** + pub(crate) fn update_topic_latest_entry_id(&mut self, region: &MitoRegionRef) { + if region.provider.is_remote_wal() && region.version().memtables.is_empty() { + let high_watermark = self + .wal + .store() + .high_watermark(®ion.provider) + .unwrap_or(0); + if high_watermark != 0 { + region + .topic_latest_entry_id + .store(high_watermark, Ordering::Relaxed); + } + info!( + "Region {} high watermark updated to {}", + region.region_id, high_watermark + ); + } + } } diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 86a2263398..573fab469e 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -94,6 +94,9 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { region_id: RegionId, provider: &Provider, ) -> Result; + + /// Returns the highest existing entry id in the log store. + fn high_watermark(&self, provider: &Provider) -> Result; } /// The response of an `append` operation. diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index a2db2d9f52..5f6069d961 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -451,6 +451,13 @@ pub struct RegionStatistic { /// The details of the region. #[serde(default)] pub manifest: RegionManifestInfo, + /// The latest entry id of the region's remote WAL since last flush. + /// For metric engine, there're two latest entry ids, one for data and one for metadata. + /// TODO(weny): remove this two fields and use single instead. + #[serde(default)] + pub data_topic_latest_entry_id: u64, + #[serde(default)] + pub metadata_topic_latest_entry_id: u64, } /// The manifest info of a region.