diff --git a/config/config.md b/config/config.md
index 122d958119..0639e06a21 100644
--- a/config/config.md
+++ b/config/config.md
@@ -329,7 +329,7 @@
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) |
-| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
+| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
Only accepts strings that match the following regular expression pattern:
[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index b2b748c7f6..2c581834a8 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -129,6 +129,8 @@ num_topics = 64
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
+## Only accepts strings that match the following regular expression pattern:
+## [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
topic_name_prefix = "greptimedb_wal_topic"
diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs
index 0fc483879d..bccdb258a9 100644
--- a/src/common/meta/src/error.rs
+++ b/src/common/meta/src/error.rs
@@ -703,6 +703,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display("Invalid topic name prefix: {}", prefix))]
+ InvalidTopicNamePrefix {
+ prefix: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
@@ -770,7 +777,8 @@ impl ErrorExt for Error {
| MismatchPrefix { .. }
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
- | InvalidUnsetDatabaseOption { .. } => StatusCode::InvalidArguments,
+ | InvalidUnsetDatabaseOption { .. }
+ | InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs
index ac03941bfe..9690b885ae 100644
--- a/src/common/meta/src/key.rs
+++ b/src/common/meta/src/key.rs
@@ -59,6 +59,9 @@
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
//! - The key is used to mark existing topics in kafka for WAL.
//!
+//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
+//! - Mapping {topic_name} to {region_id}
+//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
@@ -105,6 +108,7 @@ pub mod table_route;
pub mod test_utils;
mod tombstone;
pub mod topic_name;
+pub mod topic_region;
pub(crate) mod txn_helper;
pub mod view_info;
@@ -166,6 +170,7 @@ pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
// The legacy topic key prefix is used to store the topic name in previous versions.
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
+pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
@@ -183,6 +188,10 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;
+lazy_static! {
+ pub static ref NAME_PATTERN_REGEX: Regex = Regex::new(NAME_PATTERN).unwrap();
+}
+
lazy_static! {
static ref TABLE_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
@@ -236,6 +245,13 @@ lazy_static! {
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
}
+lazy_static! {
+ pub static ref TOPIC_REGION_PATTERN: Regex = Regex::new(&format!(
+ "^{TOPIC_REGION_PREFIX}/({NAME_PATTERN})/([0-9]+)$"
+ ))
+ .unwrap();
+}
+
/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec;
diff --git a/src/common/meta/src/key/topic_name.rs b/src/common/meta/src/key/topic_name.rs
index 533762d999..dc7ca17110 100644
--- a/src/common/meta/src/key/topic_name.rs
+++ b/src/common/meta/src/key/topic_name.rs
@@ -13,7 +13,6 @@
// limitations under the License.
use std::fmt::{self, Display};
-use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -22,7 +21,6 @@ use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
-use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
@@ -102,12 +100,6 @@ pub struct TopicNameManager {
kv_backend: KvBackendRef,
}
-impl Default for TopicNameManager {
- fn default() -> Self {
- Self::new(Arc::new(MemoryKvBackend::default()))
- }
-}
-
impl TopicNameManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -138,7 +130,7 @@ impl TopicNameManager {
}
/// Range query for topics.
- /// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys.
+ /// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
pub async fn range(&self) -> Result> {
let prefix = TopicNameKey::range_start_key();
let raw_prefix = prefix.as_bytes();
diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs
new file mode 100644
index 0000000000..52103ff6ba
--- /dev/null
+++ b/src/common/meta/src/key/topic_region.rs
@@ -0,0 +1,223 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt::{self, Display};
+
+use serde::{Deserialize, Serialize};
+use snafu::OptionExt;
+use store_api::storage::RegionId;
+
+use crate::error::{Error, InvalidMetadataSnafu, Result};
+use crate::key::{MetadataKey, TOPIC_REGION_PATTERN, TOPIC_REGION_PREFIX};
+use crate::kv_backend::KvBackendRef;
+use crate::rpc::store::{BatchPutRequest, PutRequest, RangeRequest};
+use crate::rpc::KeyValue;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct TopicRegionKey<'a> {
+ pub region_id: RegionId,
+ pub topic: &'a str,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TopicRegionValue;
+
+impl<'a> TopicRegionKey<'a> {
+ pub fn new(region_id: RegionId, topic: &'a str) -> Self {
+ Self { region_id, topic }
+ }
+
+ pub fn range_topic_key(topic: &str) -> String {
+ format!("{}/{}", TOPIC_REGION_PREFIX, topic)
+ }
+}
+
+impl<'a> MetadataKey<'a, TopicRegionKey<'a>> for TopicRegionKey<'a> {
+ fn to_bytes(&self) -> Vec {
+ self.to_string().into_bytes()
+ }
+
+ fn from_bytes(bytes: &'a [u8]) -> Result> {
+ let key = std::str::from_utf8(bytes).map_err(|e| {
+ InvalidMetadataSnafu {
+ err_msg: format!(
+ "TopicRegionKey '{}' is not a valid UTF8 string: {e}",
+ String::from_utf8_lossy(bytes)
+ ),
+ }
+ .build()
+ })?;
+ TopicRegionKey::try_from(key)
+ }
+}
+
+impl Display for TopicRegionKey<'_> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "{}/{}",
+ Self::range_topic_key(self.topic),
+ self.region_id.as_u64()
+ )
+ }
+}
+
+impl<'a> TryFrom<&'a str> for TopicRegionKey<'a> {
+ type Error = Error;
+
+ /// Value is of format `{prefix}/{topic}/{region_id}`
+ fn try_from(value: &'a str) -> Result> {
+ let captures = TOPIC_REGION_PATTERN
+ .captures(value)
+ .context(InvalidMetadataSnafu {
+ err_msg: format!("Invalid TopicRegionKey: {}", value),
+ })?;
+ let topic = captures.get(1).map(|m| m.as_str()).unwrap();
+ let region_id = captures[2].parse::().map_err(|_| {
+ InvalidMetadataSnafu {
+ err_msg: format!("Invalid region id in TopicRegionKey: {}", value),
+ }
+ .build()
+ })?;
+ Ok(TopicRegionKey {
+ region_id: RegionId::from_u64(region_id),
+ topic,
+ })
+ }
+}
+
+fn topic_region_decoder(value: &KeyValue) -> Result> {
+ let key = TopicRegionKey::from_bytes(&value.key)?;
+ Ok(key)
+}
+
+/// Manages map of topics and regions in kvbackend.
+pub struct TopicRegionManager {
+ kv_backend: KvBackendRef,
+}
+
+impl TopicRegionManager {
+ pub fn new(kv_backend: KvBackendRef) -> Self {
+ Self { kv_backend }
+ }
+
+ pub async fn put(&self, key: TopicRegionKey<'_>) -> Result<()> {
+ let put_req = PutRequest {
+ key: key.to_bytes(),
+ value: vec![],
+ prev_kv: false,
+ };
+ self.kv_backend.put(put_req).await?;
+ Ok(())
+ }
+
+ pub async fn batch_put(&self, keys: Vec>) -> Result<()> {
+ let req = BatchPutRequest {
+ kvs: keys
+ .into_iter()
+ .map(|key| KeyValue {
+ key: key.to_bytes(),
+ value: vec![],
+ })
+ .collect(),
+ prev_kv: false,
+ };
+ self.kv_backend.batch_put(req).await?;
+ Ok(())
+ }
+
+ /// Returns the list of region ids using specified topic.
+ pub async fn regions(&self, topic: &str) -> Result> {
+ let prefix = TopicRegionKey::range_topic_key(topic);
+ let req = RangeRequest::new().with_prefix(prefix.as_bytes());
+ let resp = self.kv_backend.range(req).await?;
+ let region_ids = resp
+ .kvs
+ .iter()
+ .map(topic_region_decoder)
+ .collect::>>()?;
+ Ok(region_ids.iter().map(|key| key.region_id).collect())
+ }
+
+ pub async fn delete(&self, key: TopicRegionKey<'_>) -> Result<()> {
+ let raw_key = key.to_bytes();
+ self.kv_backend.delete(&raw_key, false).await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use super::*;
+ use crate::kv_backend::memory::MemoryKvBackend;
+
+ #[tokio::test]
+ async fn test_topic_region_manager() {
+ let kv_backend = Arc::new(MemoryKvBackend::default());
+ let manager = TopicRegionManager::new(kv_backend.clone());
+
+ let topics = (0..16).map(|i| format!("topic_{}", i)).collect::>();
+ let keys = (0..64)
+ .map(|i| TopicRegionKey::new(RegionId::from_u64(i), &topics[(i % 16) as usize]))
+ .collect::>();
+
+ manager.batch_put(keys.clone()).await.unwrap();
+
+ let mut key_values = manager.regions(&topics[0]).await.unwrap();
+ let expected = keys
+ .iter()
+ .filter_map(|key| {
+ if key.topic == topics[0] {
+ Some(key.region_id)
+ } else {
+ None
+ }
+ })
+ .collect::>();
+ key_values.sort_by_key(|id| id.as_u64());
+ assert_eq!(key_values, expected);
+
+ let key = TopicRegionKey::new(RegionId::from_u64(0), "topic_0");
+ manager.delete(key.clone()).await.unwrap();
+ let mut key_values = manager.regions(&topics[0]).await.unwrap();
+ let expected = keys
+ .iter()
+ .filter_map(|key| {
+ if key.topic == topics[0] && key.region_id != RegionId::from_u64(0) {
+ Some(key.region_id)
+ } else {
+ None
+ }
+ })
+ .collect::>();
+ key_values.sort_by_key(|id| id.as_u64());
+ assert_eq!(key_values, expected);
+ }
+}
diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs
index 4f50c88bfc..9177fd0756 100644
--- a/src/common/meta/src/wal_options_allocator.rs
+++ b/src/common/meta/src/wal_options_allocator.rs
@@ -23,17 +23,18 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_wal::config::MetasrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
-use snafu::ResultExt;
+use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
-use crate::error::{EncodeWalOptionsSnafu, Result};
+use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
+use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
-#[derive(Default)]
+#[derive(Default, Debug)]
pub enum WalOptionsAllocator {
#[default]
RaftEngine,
@@ -113,6 +114,11 @@ pub async fn build_wal_options_allocator(
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
+ let prefix = &kafka_config.kafka_topic.topic_name_prefix;
+ ensure!(
+ NAME_PATTERN_REGEX.is_match(prefix),
+ InvalidTopicNamePrefixSnafu { prefix }
+ );
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
@@ -149,11 +155,14 @@ pub fn prepare_wal_options(
#[cfg(test)]
mod tests {
+ use std::assert_matches::assert_matches;
+
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
+ use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
@@ -178,6 +187,22 @@ mod tests {
assert_eq!(got, expected);
}
+ #[tokio::test]
+ async fn test_refuse_invalid_topic_name_prefix() {
+ let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ let wal_config = MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
+ kafka_topic: KafkaTopicConfig {
+ topic_name_prefix: "``````".to_string(),
+ ..Default::default()
+ },
+ ..Default::default()
+ });
+ let got = build_wal_options_allocator(&wal_config, kv_backend)
+ .await
+ .unwrap_err();
+ assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
+ }
+
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
diff --git a/src/common/meta/src/wal_options_allocator/topic_manager.rs b/src/common/meta/src/wal_options_allocator/topic_manager.rs
index 7b677b4242..ebe036b489 100644
--- a/src/common/meta/src/wal_options_allocator/topic_manager.rs
+++ b/src/common/meta/src/wal_options_allocator/topic_manager.rs
@@ -22,20 +22,21 @@ use crate::kv_backend::KvBackendRef;
/// Responsible for:
/// 1. Restores and persisting topics in kvbackend.
/// 2. Clears topics in legacy format and restores them in the new format.
+/// 3. Stores and fetches topic-region mapping in kvbackend.
pub struct KafkaTopicManager {
- key_manager: TopicNameManager,
+ topic_name_manager: TopicNameManager,
}
impl KafkaTopicManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
- key_manager: TopicNameManager::new(kv_backend),
+ topic_name_manager: TopicNameManager::new(kv_backend.clone()),
}
}
async fn restore_topics(&self) -> Result> {
- self.key_manager.update_legacy_topics().await?;
- let topics = self.key_manager.range().await?;
+ self.topic_name_manager.update_legacy_topics().await?;
+ let topics = self.topic_name_manager.range().await?;
Ok(topics)
}
@@ -57,7 +58,7 @@ impl KafkaTopicManager {
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
- self.key_manager
+ self.topic_name_manager
.batch_put(
topics
.iter()
diff --git a/src/common/meta/src/wal_options_allocator/topic_pool.rs b/src/common/meta/src/wal_options_allocator/topic_pool.rs
index aac0fb90af..d29517e6d6 100644
--- a/src/common/meta/src/wal_options_allocator/topic_pool.rs
+++ b/src/common/meta/src/wal_options_allocator/topic_pool.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::fmt::{self, Formatter};
use std::sync::Arc;
use common_wal::config::kafka::MetasrvKafkaConfig;
@@ -39,6 +40,15 @@ pub struct KafkaTopicPool {
auto_create_topics: bool,
}
+impl fmt::Debug for KafkaTopicPool {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("KafkaTopicPool")
+ .field("topics", &self.topics)
+ .field("auto_create_topics", &self.auto_create_topics)
+ .finish()
+ }
+}
+
impl KafkaTopicPool {
pub fn new(
config: &MetasrvKafkaConfig,