feat(remote-wal): introduce TopicRegionManager (#5407)

* feat: add manager to map region to topic

* chore: add a delete

* chore: rename keys

* chore: update config file

* fix: fix unit test

* chore: change prefix

* chore: clean up

* chore: follow review comments

* chore: follow review comments

* chore: follow review comments

* chore: follow review comments

* chore: follow review comments
This commit is contained in:
Yohan Wal
2025-01-22 14:06:27 +08:00
committed by GitHub
parent 51a8d0a726
commit 3ed085459c
9 changed files with 296 additions and 19 deletions

View File

@@ -329,7 +329,7 @@
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |

View File

@@ -129,6 +129,8 @@ num_topics = 64
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## Only accepts strings that match the following regular expression pattern:
## [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
topic_name_prefix = "greptimedb_wal_topic"

View File

@@ -703,6 +703,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid topic name prefix: {}", prefix))]
InvalidTopicNamePrefix {
prefix: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -770,7 +777,8 @@ impl ErrorExt for Error {
| MismatchPrefix { .. }
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. } => StatusCode::InvalidArguments,
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -59,6 +59,9 @@
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
// - The key is used to mark existing topics in kafka for WAL.
//!
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
//! - Mapping {topic_name} to {region_id}
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
@@ -105,6 +108,7 @@ pub mod table_route;
pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub(crate) mod txn_helper;
pub mod view_info;
@@ -166,6 +170,7 @@ pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
// The legacy topic key prefix is used to store the topic name in previous versions.
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
@@ -183,6 +188,10 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;
lazy_static! {
pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
}
lazy_static! {
static ref TABLE_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
@@ -236,6 +245,13 @@ lazy_static! {
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
}
lazy_static! {
pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
"^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
))
.unwrap();
}
/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -22,7 +21,6 @@ use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
@@ -102,12 +100,6 @@ pub struct TopicNameManager {
kv_backend: KvBackendRef,
}
impl Default for TopicNameManager {
fn default() -> Self {
Self::new(Arc::new(MemoryKvBackend::default()))
}
}
impl TopicNameManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -138,7 +130,7 @@ impl TopicNameManager {
}
/// Range query for topics.
/// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys.
/// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
let raw_prefix = prefix.as_bytes();

View File

@@ -0,0 +1,223 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionId;
use crate::error::{Error, InvalidMetadataSnafu, Result};
use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
use crate::rpc::KeyValue;
#[derive(Debug, Clone, PartialEq)]
pub struct TopicRegionKey<'a> {
pub region_id: RegionId,
pub topic: &'a str,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TopicRegionValue;
impl<'a> TopicRegionKey<'a> {
pub fn new(region_id: RegionId, topic: &'a str) -> Self {
Self { region_id, topic }
}
pub fn range_topic_key(topic: &str) -> String {
format!("{}/{}", TOPIC_REGION_PREFIX, topic)
}
}
impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<TopicRegionKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidMetadataSnafu {
err_msg: format!(
"TopicRegionKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
TopicRegionKey::try_from(key)
}
}
impl Display for TopicRegionKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}",
Self::range_topic_key(self.topic),
self.region_id.as_u64()
)
}
}
impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
type Error = Error;
/// Value is of format `{prefix}/{topic}/{region_id}`
fn try_from(value: &'a str) -> Result<TopicRegionKey<'a>> {
let captures = TOPIC_REGION_PATTERN
.captures(value)
.context(InvalidMetadataSnafu {
err_msg: format!("Invalid TopicRegionKey: {}", value),
})?;
let topic = captures.get(1).map(|m| m.as_str()).unwrap();
let region_id = captures[2].parse::<u64>().map_err(|_| {
InvalidMetadataSnafu {
err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
}
.build()
})?;
Ok(TopicRegionKey {
region_id: RegionId::from_u64(region_id),
topic,
})
}
}
fn topic_region_decoder(value: &KeyValue) -> Result<TopicRegionKey<'_>> {
let key = TopicRegionKey::from_bytes(&value.key)?;
Ok(key)
}
/// Manages map of topics and regions in kvbackend.
pub struct TopicRegionManager {
kv_backend: KvBackendRef,
}
impl TopicRegionManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
let put_req = PutRequest {
key: key.to_bytes(),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(put_req).await?;
Ok(())
}
pub async fn batch_put(&self, keys: Vec<TopicRegionKey<'_>>) -> Result<()> {
let req = BatchPutRequest {
kvs: keys
.into_iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
/// Returns the list of region ids using specified topic.
pub async fn regions(&self, topic: &str) -> Result<Vec<RegionId>> {
let prefix = TopicRegionKey::range_topic_key(topic);
let req = RangeRequest::new().with_prefix(prefix.as_bytes());
let resp = self.kv_backend.range(req).await?;
let region_ids = resp
.kvs
.iter()
.map(topic_region_decoder)
.collect::<Result<Vec<_>>>()?;
Ok(region_ids.iter().map(|key| key.region_id).collect())
}
pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
let raw_key = key.to_bytes();
self.kv_backend.delete(&raw_key, false).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_topic_region_manager() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let topics = (0..16).map(|i| format!("topic_{}", i)).collect::<Vec<_>>();
let keys = (0..64)
.map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
.collect::<Vec<_>>();
manager.batch_put(keys.clone()).await.unwrap();
let mut key_values = manager.regions(&topics[0]).await.unwrap();
let expected = keys
.iter()
.filter_map(|key| {
if key.topic == topics[0] {
Some(key.region_id)
} else {
None
}
})
.collect::<Vec<_>>();
key_values.sort_by_key(|id| id.as_u64());
assert_eq!(key_values, expected);
let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
manager.delete(key.clone()).await.unwrap();
let mut key_values = manager.regions(&topics[0]).await.unwrap();
let expected = keys
.iter()
.filter_map(|key| {
if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
Some(key.region_id)
} else {
None
}
})
.collect::<Vec<_>>();
key_values.sort_by_key(|id| id.as_u64());
assert_eq!(key_values, expected);
}
}

View File

@@ -23,17 +23,18 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_wal::config::MetasrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
#[derive(Default)]
#[derive(Default, Debug)]
pub enum WalOptionsAllocator {
#[default]
RaftEngine,
@@ -113,6 +114,11 @@ pub async fn build_wal_options_allocator(
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
ensure!(
NAME_PATTERN_REGEX.is_match(prefix),
InvalidTopicNamePrefixSnafu { prefix }
);
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
@@ -149,11 +155,14 @@ pub fn prepare_wal_options(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
@@ -178,6 +187,22 @@ mod tests {
assert_eq!(got, expected);
}
#[tokio::test]
async fn test_refuse_invalid_topic_name_prefix() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
kafka_topic: KafkaTopicConfig {
topic_name_prefix: "``````".to_string(),
..Default::default()
},
..Default::default()
});
let got = build_wal_options_allocator(&wal_config, kv_backend)
.await
.unwrap_err();
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {

View File

@@ -22,20 +22,21 @@ use crate::kv_backend::KvBackendRef;
/// Responsible for:
/// 1. Restores and persisting topics in kvbackend.
/// 2. Clears topics in legacy format and restores them in the new format.
/// 3. Stores and fetches topic-region mapping in kvbackend.
pub struct KafkaTopicManager {
key_manager: TopicNameManager,
topic_name_manager: TopicNameManager,
}
impl KafkaTopicManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
key_manager: TopicNameManager::new(kv_backend),
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
}
}
async fn restore_topics(&self) -> Result<Vec<String>> {
self.key_manager.update_legacy_topics().await?;
let topics = self.key_manager.range().await?;
self.topic_name_manager.update_legacy_topics().await?;
let topics = self.topic_name_manager.range().await?;
Ok(topics)
}
@@ -57,7 +58,7 @@ impl KafkaTopicManager {
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
self.key_manager
self.topic_name_manager
.batch_put(
topics
.iter()

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Formatter};
use std::sync::Arc;
use common_wal::config::kafka::MetasrvKafkaConfig;
@@ -39,6 +40,15 @@ pub struct KafkaTopicPool {
auto_create_topics: bool,
}
impl fmt::Debug for KafkaTopicPool {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaTopicPool")
.field("topics", &self.topics)
.field("auto_create_topics", &self.auto_create_topics)
.finish()
}
}
impl KafkaTopicPool {
pub fn new(
config: &MetasrvKafkaConfig,