mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: allow skipping topic creation (#4616)
* feat: introduce `create_topics` opt * feat: allow skipping topic creation * chore: refine docs * chore: apply suggestions from CR
This commit is contained in:
@@ -67,9 +67,10 @@
|
||||
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
|
||||
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
|
||||
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
|
||||
@@ -287,9 +288,10 @@
|
||||
| `wal` | -- | -- | -- |
|
||||
| `wal.provider` | String | `raft_engine` | -- |
|
||||
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. |
|
||||
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
|
||||
| `wal.num_topics` | Integer | `64` | Number of topics. |
|
||||
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
|
||||
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. |
|
||||
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
|
||||
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
|
||||
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
|
||||
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
|
||||
|
||||
@@ -99,7 +99,12 @@ provider = "raft_engine"
|
||||
## The broker endpoints of the Kafka cluster.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## Number of topics to be created upon start.
|
||||
## Automatically create topics for WAL.
|
||||
## Set to `true` to automatically create topics for WAL.
|
||||
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
|
||||
auto_create_topics = true
|
||||
|
||||
## Number of topics.
|
||||
num_topics = 64
|
||||
|
||||
## Topic selector type.
|
||||
@@ -108,6 +113,7 @@ num_topics = 64
|
||||
selector_type = "round_robin"
|
||||
|
||||
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
|
||||
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
|
||||
topic_name_prefix = "greptimedb_wal_topic"
|
||||
|
||||
## Expected number of replicas of each partition.
|
||||
|
||||
@@ -171,7 +171,12 @@ sync_period = "10s"
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
|
||||
## Number of topics to be created upon start.
|
||||
## Automatically create topics for WAL.
|
||||
## Set to `true` to automatically create topics for WAL.
|
||||
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
|
||||
auto_create_topics = true
|
||||
|
||||
## Number of topics.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
num_topics = 64
|
||||
|
||||
@@ -182,6 +187,7 @@ num_topics = 64
|
||||
selector_type = "round_robin"
|
||||
|
||||
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
|
||||
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
|
||||
## **It's only used when the provider is `kafka`**.
|
||||
topic_name_prefix = "greptimedb_wal_topic"
|
||||
|
||||
|
||||
@@ -76,6 +76,10 @@ impl TopicManager {
|
||||
/// The initializer first tries to restore persisted topics from the kv backend.
|
||||
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
// Skip creating topics.
|
||||
if !self.config.auto_create_topics {
|
||||
return Ok(());
|
||||
}
|
||||
let num_topics = self.config.kafka_topic.num_topics;
|
||||
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
|
||||
connection: config.connection,
|
||||
backoff: config.backoff,
|
||||
kafka_topic: config.kafka_topic,
|
||||
auto_create_topics: config.auto_create_topics,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -188,6 +189,7 @@ mod tests {
|
||||
replication_factor: 1,
|
||||
create_topic_timeout: Duration::from_secs(30),
|
||||
},
|
||||
auto_create_topics: true,
|
||||
};
|
||||
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
|
||||
|
||||
|
||||
@@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct KafkaTopicConfig {
|
||||
/// Number of topics to be created upon start.
|
||||
/// Number of topics.
|
||||
pub num_topics: usize,
|
||||
/// Number of partitions per topic.
|
||||
pub num_partitions: i32,
|
||||
|
||||
@@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
|
||||
/// The kafka topic config.
|
||||
#[serde(flatten)]
|
||||
pub kafka_topic: KafkaTopicConfig,
|
||||
// Automatically create topics for WAL.
|
||||
pub auto_create_topics: bool,
|
||||
// Create index for WAL.
|
||||
pub create_index: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub dump_index_interval: Duration,
|
||||
@@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig {
|
||||
consumer_wait_timeout: Duration::from_millis(100),
|
||||
backoff: BackoffConfig::default(),
|
||||
kafka_topic: KafkaTopicConfig::default(),
|
||||
auto_create_topics: true,
|
||||
create_index: true,
|
||||
dump_index_interval: Duration::from_secs(60),
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig;
|
||||
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
|
||||
|
||||
/// Kafka wal configurations for metasrv.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct MetasrvKafkaConfig {
|
||||
/// The kafka connection config.
|
||||
@@ -30,4 +30,17 @@ pub struct MetasrvKafkaConfig {
|
||||
/// The kafka config.
|
||||
#[serde(flatten)]
|
||||
pub kafka_topic: KafkaTopicConfig,
|
||||
// Automatically create topics for WAL.
|
||||
pub auto_create_topics: bool,
|
||||
}
|
||||
|
||||
impl Default for MetasrvKafkaConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
connection: Default::default(),
|
||||
backoff: Default::default(),
|
||||
kafka_topic: Default::default(),
|
||||
auto_create_topics: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user