mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
chore(remote_wal): remove topic alias (#3120)
chore: remove topic alias
This commit is contained in:
@@ -18,9 +18,7 @@ pub mod raft_engine;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::with_prefix;
|
||||
|
||||
pub use crate::wal::kafka::{
|
||||
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
|
||||
};
|
||||
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig};
|
||||
pub use crate::wal::raft_engine::RaftEngineConfig;
|
||||
|
||||
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
|
||||
|
||||
@@ -19,11 +19,6 @@ use rskafka::client::partition::Compression as RsKafkaCompression;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::with_prefix;
|
||||
|
||||
/// Topic name prefix.
|
||||
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
|
||||
/// Kafka wal topic.
|
||||
pub type Topic = String;
|
||||
|
||||
/// The type of the topic selector, i.e. with which strategy to select a topic.
|
||||
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -138,5 +133,5 @@ impl Default for StandaloneKafkaConfig {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct KafkaOptions {
|
||||
/// Kafka wal topic.
|
||||
pub topic: Topic,
|
||||
pub topic: String,
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
|
||||
use crate::wal::kafka::KafkaConfig;
|
||||
pub use crate::wal::kafka::Topic as KafkaWalTopic;
|
||||
pub use crate::wal::options_allocator::{
|
||||
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
|
||||
};
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
pub mod topic;
|
||||
pub mod topic_manager;
|
||||
pub mod topic_selector;
|
||||
|
||||
@@ -23,7 +22,6 @@ use std::time::Duration;
|
||||
use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use crate::wal::kafka::topic::Topic;
|
||||
pub use crate::wal::kafka::topic_manager::TopicManager;
|
||||
|
||||
/// Configurations for kafka wal.
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Kafka wal topic.
|
||||
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
|
||||
/// A topic is simply a string right now. But it may be more complex in the future.
|
||||
// TODO(niebayes): remove the Topic alias.
|
||||
pub type Topic = String;
|
||||
@@ -33,7 +33,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::PutRequest;
|
||||
use crate::wal::kafka::topic::Topic;
|
||||
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef};
|
||||
use crate::wal::kafka::KafkaConfig;
|
||||
|
||||
@@ -46,7 +45,7 @@ const DEFAULT_PARTITION: i32 = 0;
|
||||
/// Manages topic initialization and selection.
|
||||
pub struct TopicManager {
|
||||
config: KafkaConfig,
|
||||
pub(crate) topic_pool: Vec<Topic>,
|
||||
pub(crate) topic_pool: Vec<String>,
|
||||
pub(crate) topic_selector: TopicSelectorRef,
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
@@ -86,7 +85,7 @@ impl TopicManager {
|
||||
let created_topics = Self::restore_created_topics(&self.kv_backend)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<HashSet<Topic>>();
|
||||
.collect::<HashSet<String>>();
|
||||
|
||||
// Creates missing topics.
|
||||
let to_be_created = topics
|
||||
@@ -108,7 +107,7 @@ impl TopicManager {
|
||||
}
|
||||
|
||||
/// Tries to create topics specified by indexes in `to_be_created`.
|
||||
async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> {
|
||||
async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let backoff_config = BackoffConfig {
|
||||
init_backoff: self.config.backoff.init,
|
||||
@@ -141,18 +140,18 @@ impl TopicManager {
|
||||
}
|
||||
|
||||
/// Selects one topic from the topic pool through the topic selector.
|
||||
pub fn select(&self) -> Result<&Topic> {
|
||||
pub fn select(&self) -> Result<&String> {
|
||||
self.topic_selector.select(&self.topic_pool)
|
||||
}
|
||||
|
||||
/// Selects a batch of topics from the topic pool through the topic selector.
|
||||
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&Topic>> {
|
||||
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
|
||||
(0..num_topics)
|
||||
.map(|_| self.topic_selector.select(&self.topic_pool))
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> {
|
||||
async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
|
||||
let partition_client = client
|
||||
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
|
||||
.await
|
||||
@@ -177,7 +176,7 @@ impl TopicManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
|
||||
async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> {
|
||||
match client
|
||||
.create_topic(
|
||||
topic.clone(),
|
||||
@@ -203,7 +202,7 @@ impl TopicManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
|
||||
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<String>> {
|
||||
kv_backend
|
||||
.get(CREATED_TOPICS_KEY.as_bytes())
|
||||
.await?
|
||||
@@ -213,7 +212,7 @@ impl TopicManager {
|
||||
)
|
||||
}
|
||||
|
||||
async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> {
|
||||
async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> {
|
||||
let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?;
|
||||
kv_backend
|
||||
.put(PutRequest {
|
||||
|
||||
@@ -19,12 +19,11 @@ use rand::Rng;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{EmptyTopicPoolSnafu, Result};
|
||||
use crate::wal::kafka::topic::Topic;
|
||||
|
||||
/// Controls topic selection.
|
||||
pub(crate) trait TopicSelector: Send + Sync {
|
||||
/// Selects a topic from the topic pool.
|
||||
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>;
|
||||
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
|
||||
}
|
||||
|
||||
/// Arc wrapper of TopicSelector.
|
||||
@@ -48,7 +47,7 @@ impl RoundRobinTopicSelector {
|
||||
}
|
||||
|
||||
impl TopicSelector for RoundRobinTopicSelector {
|
||||
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> {
|
||||
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
|
||||
ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
|
||||
let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
|
||||
Ok(&topic_pool[which])
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_config::wal::KafkaWalTopic;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_runtime::error::Error as RuntimeError;
|
||||
@@ -120,7 +119,7 @@ pub enum Error {
|
||||
error
|
||||
))]
|
||||
GetClient {
|
||||
topic: KafkaWalTopic,
|
||||
topic: String,
|
||||
location: Location,
|
||||
error: String,
|
||||
},
|
||||
@@ -141,7 +140,7 @@ pub enum Error {
|
||||
limit,
|
||||
))]
|
||||
ProduceRecord {
|
||||
topic: KafkaWalTopic,
|
||||
topic: String,
|
||||
size: usize,
|
||||
limit: usize,
|
||||
location: Location,
|
||||
|
||||
@@ -18,7 +18,6 @@ pub(crate) mod util;
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_meta::wal::KafkaWalTopic as Topic;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::logstore::entry::{Entry, Id as EntryId};
|
||||
use store_api::logstore::namespace::Namespace;
|
||||
@@ -29,7 +28,7 @@ use crate::error::Error;
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
|
||||
pub struct NamespaceImpl {
|
||||
pub region_id: u64,
|
||||
pub topic: Topic,
|
||||
pub topic: String,
|
||||
}
|
||||
|
||||
impl Namespace for NamespaceImpl {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
|
||||
use common_config::wal::KafkaConfig;
|
||||
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
|
||||
use rskafka::client::producer::aggregator::RecordAggregator;
|
||||
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
|
||||
@@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
|
||||
client_factory: RsKafkaClient,
|
||||
/// A pool maintaining a collection of clients.
|
||||
/// Key: a topic. Value: the associated client of the topic.
|
||||
client_pool: RwLock<HashMap<Topic, Client>>,
|
||||
client_pool: RwLock<HashMap<String, Client>>,
|
||||
}
|
||||
|
||||
impl ClientManager {
|
||||
@@ -97,7 +97,7 @@ impl ClientManager {
|
||||
|
||||
/// Gets the client associated with the topic. If the client does not exist, a new one will
|
||||
/// be created and returned.
|
||||
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
|
||||
pub(crate) async fn get_or_insert(&self, topic: &String) -> Result<Client> {
|
||||
{
|
||||
let client_pool = self.client_pool.read().await;
|
||||
if let Some(client) = client_pool.get(topic) {
|
||||
@@ -116,7 +116,7 @@ impl ClientManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
|
||||
async fn try_create_client(&self, topic: &String) -> Result<Client> {
|
||||
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
|
||||
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
|
||||
// The reconnecting won't stop until succeed or a different error returns.
|
||||
@@ -147,7 +147,7 @@ mod tests {
|
||||
test_name: &str,
|
||||
num_topics: usize,
|
||||
broker_endpoints: Vec<String>,
|
||||
) -> (ClientManager, Vec<Topic>) {
|
||||
) -> (ClientManager, Vec<String>) {
|
||||
let topics = create_topics(
|
||||
num_topics,
|
||||
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
|
||||
|
||||
@@ -283,7 +283,6 @@ fn check_termination(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::wal::KafkaWalTopic as Topic;
|
||||
use rand::seq::IteratorRandom;
|
||||
|
||||
use super::*;
|
||||
@@ -304,7 +303,7 @@ mod tests {
|
||||
test_name: &str,
|
||||
num_topics: usize,
|
||||
broker_endpoints: Vec<String>,
|
||||
) -> (KafkaLogStore, Vec<Topic>) {
|
||||
) -> (KafkaLogStore, Vec<String>) {
|
||||
let topics = create_topics(
|
||||
num_topics,
|
||||
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use common_meta::wal::KafkaWalTopic as Topic;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::rngs::ThreadRng;
|
||||
use rand::{thread_rng, Rng};
|
||||
@@ -29,7 +28,7 @@ pub async fn create_topics<F>(
|
||||
num_topics: usize,
|
||||
decorator: F,
|
||||
broker_endpoints: &[String],
|
||||
) -> Vec<Topic>
|
||||
) -> Vec<String>
|
||||
where
|
||||
F: Fn(usize) -> String,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user