feat(remote_wal): implement topic allocation (#2970)

* chore: implement wal options allocator

* chore: implement round-robin topic selector

* feat: add shuffle to round-robin topic selector

* chore: implement kafka topic manager

* test: add tests for wal options allocator

* test: add wal provider to test config files

* test: leave todos for adding tests for remote wal

* fix: resolve review conversations

* fix: typo
This commit is contained in:
niebayes
2023-12-22 16:26:48 +08:00
committed by GitHub
parent 6221e5b105
commit 830a91c548
20 changed files with 427 additions and 88 deletions

View File

@@ -175,6 +175,7 @@ jobs:
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
#TODO(niebaye) Add a step to setup kafka clusters. Maybe add a docker file for starting kafka clusters.
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:

2
Cargo.lock generated
View File

@@ -1835,7 +1835,9 @@ dependencies = [
"lazy_static",
"prometheus",
"prost 0.12.2",
"rand",
"regex",
"rskafka",
"serde",
"serde_json",
"serde_with",

View File

@@ -61,11 +61,21 @@ provider = "raft_engine"
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_kafka_wal"
# topic_name_prefix = "greptimedb_wal_kafka"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"
# Metasrv export the metrics generated by itself
# encoded to Prometheus remote-write format

View File

@@ -55,6 +55,12 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to start datanode"))]
StartDatanode {
location: Location,
@@ -270,6 +276,7 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }

View File

@@ -19,7 +19,6 @@ use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
@@ -28,7 +27,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
@@ -51,9 +50,9 @@ use servers::Mode;
use snafu::ResultExt;
use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{CliOptions, MixOptions, Options};
use crate::App;
@@ -180,6 +179,7 @@ pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
}
#[async_trait]
@@ -196,6 +196,11 @@ impl App for Instance {
.await
.context(StartProcedureManagerSnafu)?;
self.wal_options_allocator
.start()
.await
.context(StartWalOptionsAllocatorSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
@@ -388,14 +393,13 @@ impl StartCommand {
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
wal_options_allocator.clone(),
));
let ddl_task_executor = Self::create_ddl_task_executor(
@@ -425,6 +429,7 @@ impl StartCommand {
datanode,
frontend,
procedure_manager,
wal_options_allocator,
})
}

View File

@@ -32,7 +32,9 @@ humantime-serde.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
rskafka = "0.5"
serde.workspace = true
serde_json.workspace = true
serde_with = "3"

View File

@@ -290,12 +290,46 @@ pub enum Error {
"Failed to encode a wal options to json string, wal_options: {:?}",
wal_options
))]
EncodeWalOptionsToJson {
EncodeWalOptions {
wal_options: WalOptions,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},
#[snafu(display("Invalid number of topics {}", num_topics))]
InvalidNumTopics {
num_topics: usize,
location: Location,
},
#[snafu(display(
"Failed to build a kafka client, broker endpoints: {:?}",
broker_endpoints
))]
BuildKafkaClient {
broker_endpoints: Vec<String>,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to build a kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to create a kafka wal topic"))]
CreateKafkaWalTopic {
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -331,7 +365,11 @@ impl ErrorExt for Error {
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. }
| RegionOperatingRace { .. }
| EncodeWalOptionsToJson { .. } => StatusCode::Unexpected,
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,
SendMessage { .. }
| GetKvCache { .. }
@@ -356,6 +394,8 @@ impl ErrorExt for Error {
RetryLater { source, .. } => source.status_code(),
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
InvalidNumTopics { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -23,7 +23,9 @@ use serde_with::with_prefix;
use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator};
pub use crate::wal::options_allocator::{
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
};
/// Wal config for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
@@ -38,6 +40,8 @@ pub enum WalConfig {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType;
@@ -65,19 +69,31 @@ mod tests {
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_kafka_wal"
topic_name_prefix = "greptimedb_wal_kafka"
num_partitions = 1
replication_factor = 3
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2.0
backoff_deadline = "5mins"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
let expected_kafka_config = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
selector_type: KafkaTopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}
// TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed.
}

View File

@@ -16,6 +16,8 @@ pub mod topic;
pub mod topic_manager;
pub mod topic_selector;
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub use crate::wal::kafka::topic::Topic;
@@ -37,6 +39,21 @@ pub struct KafkaConfig {
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
pub backoff_base: f64,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
}
impl Default for KafkaConfig {
@@ -45,9 +62,14 @@ impl Default for KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

View File

@@ -12,16 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use crate::error::Result;
use common_telemetry::debug;
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
EncodeJsonSnafu, InvalidNumTopicsSnafu, Result,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal::kafka::topic::Topic;
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef};
use crate::wal::kafka::KafkaConfig;
const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/";
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
// TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created.
topic_pool: Vec<Topic>,
topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
@@ -29,33 +44,161 @@ pub struct TopicManager {
impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self {
pub fn new(config: KafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
.collect::<Vec<_>>();
let selector = match config.selector_type {
SelectorType::RoundRobin => RoundRobinTopicSelector::new(),
SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};
Self {
topic_pool: Vec::new(),
config,
topic_pool: topics,
topic_selector: Arc::new(selector),
kv_backend,
}
}
/// Tries to initialize the topic pool.
/// Tries to initialize the topic manager.
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request more topics.
pub async fn try_init(&mut self) -> Result<()> {
todo!()
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
let num_topics = self.config.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
// Topics should be created.
let topics = &self.topic_pool;
// Topics already created.
// There may have extra topics created but it's okay since those topics won't break topic allocation.
let created_topics = Self::restore_created_topics(&self.kv_backend)
.await?
.into_iter()
.collect::<HashSet<Topic>>();
debug!("Restored {} topics", created_topics.len());
// Creates missing topics.
let to_be_created = topics
.iter()
.enumerate()
.filter_map(|(i, topic)| {
if created_topics.contains(topic) {
return None;
}
Some(i)
})
.collect::<Vec<_>>();
if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
debug!("Persisted {} topics", topics.len());
}
Ok(())
}
/// Tries to create topics specified by indexes in `to_be_created`.
async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff_init,
max_backoff: self.config.backoff_max,
base: self.config.backoff_base,
deadline: self.config.backoff_deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
.backoff_config(backoff_config)
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: self.config.broker_endpoints.clone(),
})?
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
// Spawns tokio tasks for creating missing topics.
let tasks = to_be_created
.iter()
.map(|i| {
client.create_topic(
topics[*i].clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
})
.collect::<Vec<_>>();
// TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
.map(|_| ())
}
/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> &Topic {
pub fn select(&self) -> Result<&Topic> {
self.topic_selector.select(&self.topic_pool)
}
/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Vec<Topic> {
// TODO(niebayes): calls `select` to select a collection of topics in a batching manner.
vec!["tmp_topic".to_string(); num_topics]
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&Topic>> {
(0..num_topics)
.map(|_| self.topic_selector.select(&self.topic_pool))
.collect()
}
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
.await?
.map_or_else(
|| Ok(vec![]),
|key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu),
)
}
async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> {
let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?;
kv_backend
.put(PutRequest {
key: CREATED_TOPICS_KEY.as_bytes().to_vec(),
value: raw_topics,
prev_kv: false,
})
.await
.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_kafka";
let num_topics = 16;
// Constructs mock topics.
let topics = (0..num_topics)
.map(|topic| format!("{topic_name_prefix}{topic}"))
.collect::<Vec<_>>();
// Persists topics to kv backend.
TopicManager::persist_created_topics(&topics, &kv_backend)
.await
.unwrap();
// Restores topics from kv backend.
let restored_topics = TopicManager::restore_created_topics(&kv_backend)
.await
.unwrap();
assert_eq!(topics, restored_topics);
}
}

View File

@@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{EmptyTopicPoolSnafu, Result};
use crate::wal::kafka::topic::Topic;
/// The type of the topic selector, i.e. with which strategy to select a topic.
@@ -27,25 +31,56 @@ pub enum SelectorType {
}
/// Controls topic selection.
pub(super) trait TopicSelector: Send + Sync {
pub(crate) trait TopicSelector: Send + Sync {
/// Selects a topic from the topic pool.
fn select(&self, topic_pool: &[Topic]) -> &Topic;
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>;
}
pub(super) type TopicSelectorRef = Arc<dyn TopicSelector>;
/// Arc wrapper of TopicSelector.
pub(crate) type TopicSelectorRef = Arc<dyn TopicSelector>;
/// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner.
pub(super) struct RoundRobinTopicSelector;
#[derive(Default)]
pub(crate) struct RoundRobinTopicSelector {
cursor: AtomicUsize,
}
impl RoundRobinTopicSelector {
/// Creates a new round-robin topic selector.
pub(super) fn new() -> Self {
todo!()
// The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
// Introducing a shuffling strategy may help mitigate this issue.
pub fn with_shuffle() -> Self {
let offset = rand::thread_rng().gen_range(0..64);
Self {
cursor: AtomicUsize::new(offset),
}
}
}
impl TopicSelector for RoundRobinTopicSelector {
fn select(&self, topic_pool: &[Topic]) -> &Topic {
todo!()
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> {
ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
Ok(&topic_pool[which])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
let selector = RoundRobinTopicSelector::default();
assert_eq!(selector.select(&topic_pool).unwrap(), "0");
assert_eq!(selector.select(&topic_pool).unwrap(), "1");
assert_eq!(selector.select(&topic_pool).unwrap(), "2");
assert_eq!(selector.select(&topic_pool).unwrap(), "0");
// Creates a round-robin selector with shuffle.
let selector = RoundRobinTopicSelector::with_shuffle();
let topic = selector.select(&topic_pool).unwrap();
assert!(topic_pool.contains(topic));
}
}

View File

@@ -19,7 +19,7 @@ use common_config::{KafkaWalOptions, WalOptions};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use crate::error::{EncodeWalOptionsToJsonSnafu, Result};
use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::TopicManager as KafkaTopicManager;
use crate::wal::WalConfig;
@@ -32,60 +32,100 @@ pub enum WalOptionsAllocator {
Kafka(KafkaTopicManager),
}
/// Arc wrapper of WalOptionsAllocator.
pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self {
todo!()
pub fn new(config: WalConfig, kv_backend: KvBackendRef) -> Self {
match config {
WalConfig::RaftEngine => Self::RaftEngine,
WalConfig::Kafka(kafka_config) => {
Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend))
}
}
}
/// Tries to initialize the allocator.
pub fn try_init(&self) -> Result<()> {
todo!()
/// Tries to start the allocator.
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await,
}
}
/// Allocates a wal options for a region.
pub fn alloc(&self) -> WalOptions {
todo!()
pub fn alloc(&self) -> Result<WalOptions> {
match self {
Self::RaftEngine => Ok(WalOptions::RaftEngine),
Self::Kafka(topic_manager) => {
let topic = topic_manager.select()?;
Ok(WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
}))
}
}
}
/// Allocates a batch of wal options where each wal options goes to a region.
pub fn alloc_batch(&self, num_regions: usize) -> Vec<WalOptions> {
pub fn alloc_batch(&self, num_regions: usize) -> Result<Vec<WalOptions>> {
match self {
WalOptionsAllocator::RaftEngine => vec![WalOptions::RaftEngine; num_regions],
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
let topics = topic_manager.select_batch(num_regions);
topics
let options_batch = topic_manager
.select_batch(num_regions)?
.into_iter()
.map(|topic| WalOptions::Kafka(KafkaWalOptions { topic }))
.collect()
.map(|topic| {
WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
})
})
.collect();
Ok(options_batch)
}
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn build_region_wal_options(
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len())
.alloc_batch(regions.len())?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options })
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Builds a wal options allocator.
// TODO(niebayes): implement.
pub async fn build_wal_options_allocator(
config: &WalConfig,
kv_backend: &KvBackendRef,
) -> Result<WalOptionsAllocator> {
let _ = config;
let _ = kv_backend;
Ok(WalOptionsAllocator::default())
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
// Tests the wal options allocator could successfully allocate raft-engine wal options.
// Note: tests for allocator with kafka are integration tests.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = WalConfig::RaftEngine;
let mut allocator = WalOptionsAllocator::new(wal_config, kv_backend);
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
.into_iter()
.zip(vec![encoded_wal_options; num_regions as usize])
.collect();
assert_eq!(got, expected);
}
}

View File

@@ -26,8 +26,8 @@ use common_meta::peer::Peer;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_meta::wal::options_allocator::allocate_region_wal_options;
use common_meta::wal::WalOptionsAllocatorRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, info, tracing};
@@ -108,11 +108,14 @@ impl Datanode for RegionInvoker {
pub struct StandaloneTableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
wal_options_allocator: WalOptionsAllocatorRef,
}
impl StandaloneTableMetadataAllocator {
pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
@@ -185,7 +188,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",

View File

@@ -26,6 +26,7 @@ use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal::options_allocator::WalOptionsAllocatorRef;
use common_meta::wal::WalConfig;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
@@ -177,6 +178,7 @@ pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
subscribe_manager: Option<SubscribeManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
@@ -196,6 +198,11 @@ impl MetaStateHandler {
if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}
if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
}
self.greptimedb_telemetry_task.should_report(true);
}
@@ -236,6 +243,7 @@ pub struct MetaSrv {
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
@@ -270,6 +278,7 @@ impl MetaSrv {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
wal_options_allocator: self.wal_options_allocator.clone(),
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
};

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::TableMetadataAllocatorRef;
@@ -31,14 +30,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal::build_wal_options_allocator;
use common_meta::wal::WalOptionsAllocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, OtherSnafu, Result};
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
@@ -206,10 +205,10 @@ impl MetaSrvBuilder {
table_id: None,
};
let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
options.wal.clone(),
kv_backend.clone(),
));
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -221,7 +220,7 @@ impl MetaSrvBuilder {
selector_ctx.clone(),
selector.clone(),
sequence.clone(),
wal_options_allocator,
wal_options_allocator.clone(),
))
});
@@ -314,6 +313,7 @@ impl MetaSrvBuilder {
procedure_manager,
mailbox,
ddl_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),

View File

@@ -19,8 +19,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_meta::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
@@ -33,7 +32,7 @@ pub struct MetaSrvTableMetadataAllocator {
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
wal_options_allocator: WalOptionsAllocatorRef,
}
impl MetaSrvTableMetadataAllocator {
@@ -41,7 +40,7 @@ impl MetaSrvTableMetadataAllocator {
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self {
ctx,
@@ -75,7 +74,7 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",

View File

@@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_meta::wal::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
@@ -119,13 +119,13 @@ impl GreptimeDbStandaloneBuilder {
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.unwrap();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
wal_options_allocator.clone(),
));
let ddl_task_executor = Arc::new(
@@ -147,6 +147,7 @@ impl GreptimeDbStandaloneBuilder {
.unwrap();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();
test_util::prepare_another_catalog_and_schema(&instance).await;

View File

@@ -25,3 +25,5 @@ grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
// region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
sql_tests!(File);
// TODO(niebayes): add integration tests for remote wal.

View File

@@ -6,6 +6,7 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
[wal]
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'

View File

@@ -3,6 +3,7 @@ enable_memory_catalog = false
require_lease_before_startup = true
[wal]
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'