feat: introduce high_watermark for remote wal logstore (#5877)

* feat: introduce high_watermark_since_flush

* test: add unit test for high watermark

* refactor: submit a request instead

* fix: send reply before submit request

* fix: no need to update twice

* feat: update high watermark in background periodically

* test: update unit tests

* fix: update high watermark periodically

* test: update unit tests

* chore: apply review comments

* chore: rename

* chore: apply review comments

* chore: clean up

* chore: apply review comments
This commit is contained in:
Yuhan Wang
2025-04-18 20:10:47 +08:00
committed by GitHub
parent 1e394af583
commit 41814bb49f
20 changed files with 589 additions and 128 deletions

View File

@@ -26,6 +26,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
dashmap.workspace = true
delta-encoding = "0.4"
derive_builder.workspace = true
futures.workspace = true

View File

@@ -14,9 +14,12 @@
pub(crate) mod client_manager;
pub(crate) mod consumer;
mod high_watermark_manager;
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
#[cfg(test)]
pub(crate) mod test_util;
pub(crate) mod util;
pub(crate) mod worker;

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
use rskafka::client::ClientBuilder;
use snafu::ResultExt;
@@ -64,6 +65,9 @@ pub(crate) struct ClientManager {
flush_batch_size: usize,
compression: Compression,
/// High watermark for each topic.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
}
impl ClientManager {
@@ -71,6 +75,7 @@ impl ClientManager {
pub(crate) async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
@@ -96,6 +101,7 @@ impl ClientManager {
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: Compression::Lz4,
global_index_collector,
high_watermark,
})
}
@@ -111,6 +117,7 @@ impl ClientManager {
.write()
.await
.insert(provider.clone(), client.clone());
self.high_watermark.insert(provider.clone(), 0);
Ok(client)
}
}
@@ -159,6 +166,7 @@ impl ClientManager {
self.compression,
self.flush_batch_size,
index_collector,
self.high_watermark.clone(),
));
Ok(Client { client, producer })
@@ -167,66 +175,20 @@ impl ClientManager {
pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
self.global_index_collector.as_ref()
}
#[cfg(test)]
pub(crate) fn high_watermark(&self) -> &Arc<DashMap<Arc<KafkaProvider>, u64>> {
&self.high_watermark
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use tokio::sync::Barrier;
use super::*;
/// Creates `num_topics` number of topics each will be decorated by the given decorator.
pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<String>
where
F: Fn(usize) -> String,
{
assert!(!broker_endpoints.is_empty());
let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator(i);
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();
topics
}
/// Prepares for a test in that a collection of topics and a client manager are created.
async fn prepare(
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (ClientManager, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
&broker_endpoints,
)
.await;
let config = DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
..Default::default()
};
let manager = ClientManager::try_new(&config, None).await.unwrap();
(manager, topics)
}
use crate::kafka::test_util::prepare;
/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]

View File

@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::error;
use dashmap::DashMap;
use store_api::logstore::provider::KafkaProvider;
use tokio::time::{interval, MissedTickBehavior};
use crate::error::Result;
use crate::kafka::client_manager::ClientManagerRef;
/// HighWatermarkManager is responsible for periodically updating the high watermark
/// (latest existing record offset) for each Kafka topic.
pub(crate) struct HighWatermarkManager {
/// Interval to update high watermark.
update_interval: Duration,
/// The high watermark for each topic.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
/// Client manager to send requests.
client_manager: ClientManagerRef,
}
impl HighWatermarkManager {
pub(crate) fn new(
update_interval: Duration,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
client_manager: ClientManagerRef,
) -> Self {
Self {
update_interval,
high_watermark,
client_manager,
}
}
/// Starts the high watermark manager as a background task
///
/// This spawns a task that periodically queries Kafka for the latest
/// high watermark values for all registered topics and updates the shared map.
pub(crate) async fn run(self) {
common_runtime::spawn_global(async move {
let mut interval = interval(self.update_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.try_update().await {
error!(e; "Failed to update high watermark");
}
}
});
}
/// Attempts to update the high watermark for all registered topics
///
/// Iterates through all topics in the high watermark map, obtains a producer
/// for each topic, and requests an update of the high watermark value.
pub(crate) async fn try_update(&self) -> Result<()> {
for iterator_element in self.high_watermark.iter() {
let producer = self
.client_manager
.get_or_insert(iterator_element.key())
.await?
.producer()
.clone();
producer.update_high_watermark().await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;
use store_api::storage::RegionId;
use super::*;
use crate::kafka::test_util::{prepare, record};
#[tokio::test]
async fn test_try_update_high_watermark() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let (manager, topics) =
prepare("test_try_update_high_watermark", 1, broker_endpoints).await;
let manager = Arc::new(manager);
let high_watermark_manager = HighWatermarkManager::new(
Duration::from_millis(100),
manager.high_watermark().clone(),
manager.clone(),
);
let high_watermark = high_watermark_manager.high_watermark.clone();
high_watermark_manager.run().await;
let topic = topics[0].clone();
let provider = Arc::new(KafkaProvider::new(topic.to_string()));
let producer = manager
.get_or_insert(&provider)
.await
.unwrap()
.producer()
.clone();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_high_watermark = *high_watermark.get(&provider).unwrap();
assert_eq!(current_high_watermark, 0);
let record = vec![record()];
let region = RegionId::new(1, 1);
producer.produce(region, record.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let current_high_watermark = *high_watermark.get(&provider).unwrap();
assert_eq!(current_high_watermark, record.len() as u64);
})
})
.await
}
}

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use futures::future::try_join_all;
use futures_util::StreamExt;
use rskafka::client::partition::OffsetAt;
@@ -32,6 +33,7 @@ use store_api::storage::RegionId;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
use crate::kafka::high_watermark_manager::HighWatermarkManager;
use crate::kafka::index::{
build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE,
};
@@ -41,6 +43,8 @@ use crate::kafka::util::record::{
};
use crate::metrics;
const DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
/// A log store backed by Kafka.
#[derive(Debug)]
pub struct KafkaLogStore {
@@ -52,6 +56,18 @@ pub struct KafkaLogStore {
consumer_wait_timeout: Duration,
/// Ignore missing entries during read WAL.
overwrite_entry_start_id: bool,
/// High watermark for all topics.
///
/// Represents the offset of the last record in each topic. This is used to track
/// the latest available data in Kafka topics.
///
/// The high watermark is updated in two ways:
/// - Automatically when the producer successfully commits data to Kafka
/// - Periodically by the [HighWatermarkManager](crate::kafka::high_watermark_manager::HighWatermarkManager).
///
/// This shared map allows multiple components to access the latest high watermark
/// information without needing to query Kafka directly.
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
}
impl KafkaLogStore {
@@ -60,14 +76,23 @@ impl KafkaLogStore {
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
let client_manager =
Arc::new(ClientManager::try_new(config, global_index_collector).await?);
let high_watermark = Arc::new(DashMap::new());
let client_manager = Arc::new(
ClientManager::try_new(config, global_index_collector, high_watermark.clone()).await?,
);
let high_watermark_manager = HighWatermarkManager::new(
DEFAULT_HIGH_WATERMARK_UPDATE_INTERVAL,
high_watermark.clone(),
client_manager.clone(),
);
high_watermark_manager.run().await;
Ok(Self {
client_manager,
max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
consumer_wait_timeout: config.consumer_wait_timeout,
overwrite_entry_start_id: config.overwrite_entry_start_id,
high_watermark,
})
}
}
@@ -158,6 +183,7 @@ impl LogStore for KafkaLogStore {
.collect::<HashSet<_>>();
let mut region_grouped_records: HashMap<RegionId, (OrderedBatchProducerRef, Vec<_>)> =
HashMap::with_capacity(region_ids.len());
let mut region_to_provider = HashMap::with_capacity(region_ids.len());
for entry in entries {
let provider = entry.provider().as_kafka_provider().with_context(|| {
error::InvalidProviderSnafu {
@@ -165,6 +191,7 @@ impl LogStore for KafkaLogStore {
actual: entry.provider().type_name(),
}
})?;
region_to_provider.insert(entry.region_id(), provider.clone());
let region_id = entry.region_id();
match region_grouped_records.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut slot) => {
@@ -199,6 +226,13 @@ impl LogStore for KafkaLogStore {
))
.await?;
// Updates the high watermark offset of the last record in the topic.
for (region_id, offset) in &region_grouped_max_offset {
// Safety: `region_id` is always valid.
let provider = region_to_provider.get(region_id).unwrap();
self.high_watermark.insert(provider.clone(), *offset);
}
Ok(AppendBatchResponse {
last_entry_ids: region_grouped_max_offset.into_iter().collect(),
})
@@ -383,6 +417,25 @@ impl LogStore for KafkaLogStore {
Ok(())
}
/// Returns the highest entry id of the specified topic in remote WAL.
fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
let provider = provider
.as_kafka_provider()
.with_context(|| InvalidProviderSnafu {
expected: KafkaProvider::type_name(),
actual: provider.type_name(),
})?;
let high_watermark = self
.high_watermark
.get(provider)
.as_deref()
.copied()
.unwrap_or(0);
Ok(high_watermark)
}
/// Stops components of the logstore.
async fn stop(&self) -> Result<()> {
Ok(())
@@ -567,6 +620,8 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries);
}
let high_wathermark = logstore.high_watermark(&provider).unwrap();
assert_eq!(high_wathermark, 99);
}
#[tokio::test]
@@ -640,5 +695,7 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries);
}
let high_wathermark = logstore.high_watermark(&provider).unwrap();
assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1);
}
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use common_telemetry::warn;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient};
use rskafka::record::Record;
use store_api::logstore::provider::KafkaProvider;
@@ -56,6 +57,7 @@ impl OrderedBatchProducer {
compression: Compression,
max_batch_bytes: usize,
index_collector: Box<dyn IndexCollector>,
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
) -> Self {
let mut worker = BackgroundProducerWorker {
provider,
@@ -65,6 +67,7 @@ impl OrderedBatchProducer {
request_batch_size: REQUEST_BATCH_SIZE,
max_batch_bytes,
index_collector,
high_watermark,
};
tokio::spawn(async move { worker.run().await });
Self { sender: tx }
@@ -90,6 +93,21 @@ impl OrderedBatchProducer {
Ok(handle)
}
/// Sends an [WorkerRequest::UpdateHighWatermark] request to the producer.
/// This is used to update the high watermark for the topic.
pub(crate) async fn update_high_watermark(&self) -> Result<()> {
if self
.sender
.send(WorkerRequest::UpdateHighWatermark)
.await
.is_err()
{
warn!("OrderedBatchProducer is already exited");
return error::OrderedBatchProducerStoppedSnafu {}.fail();
}
Ok(())
}
}
#[async_trait::async_trait]
@@ -135,7 +153,6 @@ mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use chrono::{TimeZone, Utc};
use common_base::readable_size::ReadableSize;
use common_telemetry::debug;
use futures::stream::FuturesUnordered;
@@ -149,6 +166,7 @@ mod tests {
use super::*;
use crate::kafka::index::NoopCollector;
use crate::kafka::producer::OrderedBatchProducer;
use crate::kafka::test_util::record;
#[derive(Debug)]
struct MockClient {
@@ -196,15 +214,6 @@ mod tests {
}
}
fn record() -> Record {
Record {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: Utc.timestamp_millis_opt(320).unwrap(),
}
}
#[tokio::test]
async fn test_producer() {
common_telemetry::init_default_ut_logging();
@@ -224,6 +233,7 @@ mod tests {
Compression::NoCompression,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
Arc::new(DashMap::new()),
);
let region_id = RegionId::new(1, 1);
@@ -272,6 +282,7 @@ mod tests {
Compression::NoCompression,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
Arc::new(DashMap::new()),
);
let region_id = RegionId::new(1, 1);
@@ -324,6 +335,7 @@ mod tests {
Compression::NoCompression,
ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize,
Box::new(NoopCollector),
Arc::new(DashMap::new()),
);
let region_id = RegionId::new(1, 1);

View File

@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use chrono::{TimeZone, Utc};
use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use rskafka::client::ClientBuilder;
use rskafka::record::Record;
use crate::kafka::client_manager::ClientManager;
/// Creates `num_topics` number of topics each will be decorated by the given decorator.
pub(crate) async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<String>
where
F: Fn(usize) -> String,
{
assert!(!broker_endpoints.is_empty());
let client = ClientBuilder::new(broker_endpoints.to_vec())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
.map(|i| {
let topic = decorator(i);
let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
(topic, task)
})
.unzip();
futures::future::try_join_all(tasks).await.unwrap();
topics
}
/// Prepares for a test in that a collection of topics and a client manager are created.
pub(crate) async fn prepare(
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (ClientManager, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
&broker_endpoints,
)
.await;
let config = DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
..Default::default()
};
let high_watermark = Arc::new(DashMap::new());
let manager = ClientManager::try_new(&config, None, high_watermark)
.await
.unwrap();
(manager, topics)
}
/// Generate a record to produce.
pub fn record() -> Record {
Record {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: Utc.timestamp_millis_opt(320).unwrap(),
}
}

View File

@@ -15,10 +15,12 @@
pub(crate) mod dump_index;
pub(crate) mod flush;
pub(crate) mod produce;
pub(crate) mod update_high_watermark;
use std::sync::Arc;
use common_telemetry::debug;
use dashmap::DashMap;
use futures::future::try_join_all;
use rskafka::client::partition::Compression;
use rskafka::record::Record;
@@ -37,6 +39,7 @@ pub(crate) enum WorkerRequest {
Produce(ProduceRequest),
TruncateIndex(TruncateIndexRequest),
DumpIndex(DumpIndexRequest),
UpdateHighWatermark,
}
impl WorkerRequest {
@@ -157,6 +160,8 @@ pub(crate) struct BackgroundProducerWorker {
pub(crate) max_batch_bytes: usize,
/// Collecting ids of WAL entries.
pub(crate) index_collector: Box<dyn IndexCollector>,
/// High watermark for all topics.
pub(crate) high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
}
impl BackgroundProducerWorker {
@@ -194,6 +199,9 @@ impl BackgroundProducerWorker {
entry_id,
}) => self.index_collector.truncate(region_id, entry_id),
WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
WorkerRequest::UpdateHighWatermark => {
self.update_high_watermark().await;
}
}
}

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{debug, error};
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;
use crate::error;
use crate::kafka::worker::BackgroundProducerWorker;
impl BackgroundProducerWorker {
/// Updates the high watermark for the topic.
///
/// This function retrieves the latest offset from Kafka and updates the high watermark
/// in the shared map.
pub async fn update_high_watermark(&mut self) {
match self
.client
.get_offset(OffsetAt::Latest)
.await
.context(error::GetOffsetSnafu {
topic: &self.provider.topic,
}) {
Ok(offset) => match self.high_watermark.entry(self.provider.clone()) {
dashmap::Entry::Occupied(mut occupied_entry) => {
let offset = offset as u64;
if *occupied_entry.get() != offset {
occupied_entry.insert(offset);
debug!(
"Updated high watermark for topic {} to {}",
self.provider.topic, offset
);
}
}
dashmap::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(offset as u64);
debug!(
"Inserted high watermark for topic {} to {}",
self.provider.topic, offset
);
}
},
Err(err) => {
error!(err; "Failed to get offset for topic {}", self.provider.topic);
}
}
}
}

View File

@@ -483,6 +483,18 @@ impl LogStore for RaftEngineLogStore {
);
Ok(())
}
fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
let ns = provider
.as_raft_engine_provider()
.with_context(|| InvalidProviderSnafu {
expected: RaftEngineProvider::type_name(),
actual: provider.type_name(),
})?;
let namespace_id = ns.id;
let last_index = self.engine.last_index(namespace_id).unwrap_or(0);
Ok(last_index)
}
}
#[derive(Debug, Clone)]