refactor: introduce common-wal to aggregate wal stuff (#3171)

* refactor: aggregate wal configs

* refactor: move wal options to common-wal

* chore: slim Cargo.toml

* fix: add missing crates

* fix: format

* chore: update comments

* chore: add testing feature gate for test_util

* fix: apply suggestions from code review

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* fix: apply suggestions from code review

* fix: compiling

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
niebayes
2024-01-18 11:49:37 +08:00
committed by GitHub
parent 3d7d2fdb4a
commit 63205907fb
62 changed files with 738 additions and 575 deletions

30
Cargo.lock generated
View File

@@ -1539,6 +1539,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-version",
"common-wal",
"config",
"datanode",
"datatypes",
@@ -1632,12 +1633,8 @@ dependencies = [
"common-base",
"humantime-serde",
"num_cpus",
"rskafka",
"serde",
"serde_json",
"serde_with",
"sysinfo",
"toml 0.8.8",
]
[[package]]
@@ -1825,7 +1822,6 @@ dependencies = [
"bytes",
"chrono",
"common-catalog",
"common-config",
"common-error",
"common-grpc-expr",
"common-macro",
@@ -1834,6 +1830,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-time",
"common-wal",
"datatypes",
"derive_builder 0.12.0",
"etcd-client",
@@ -1855,7 +1852,6 @@ dependencies = [
"strum 0.25.0",
"table",
"tokio",
"toml 0.8.8",
"tonic 0.10.2",
"uuid",
]
@@ -2014,6 +2010,21 @@ dependencies = [
"build-data",
]
[[package]]
name = "common-wal"
version = "0.6.0"
dependencies = [
"common-base",
"common-telemetry",
"futures-util",
"humantime-serde",
"rskafka",
"serde",
"serde_json",
"serde_with",
"toml 0.8.8",
]
[[package]]
name = "concurrent-queue"
version = "2.4.0"
@@ -2670,6 +2681,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"dashmap",
"datafusion",
"datafusion-common",
@@ -4519,6 +4531,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"dashmap",
"futures",
"futures-util",
@@ -4834,6 +4847,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-time",
"common-wal",
"dashmap",
"datatypes",
"derive_builder 0.12.0",
@@ -4988,6 +5002,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"dashmap",
"datafusion",
"datafusion-common",
@@ -9106,12 +9121,12 @@ dependencies = [
"async-trait",
"bytes",
"common-base",
"common-config",
"common-error",
"common-macro",
"common-query",
"common-recordbatch",
"common-time",
"common-wal",
"datatypes",
"derive_builder 0.12.0",
"futures",
@@ -9544,6 +9559,7 @@ dependencies = [
"common-runtime",
"common-telemetry",
"common-test-util",
"common-wal",
"datafusion",
"datafusion-expr",
"datanode",

View File

@@ -29,6 +29,7 @@ members = [
"src/common/time",
"src/common/decimal",
"src/common/version",
"src/common/wal",
"src/datanode",
"src/datatypes",
"src/file-engine",
@@ -124,6 +125,7 @@ rskafka = "0.5"
rust_decimal = "1.33"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "3"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
sysinfo = "0.30"
@@ -169,6 +171,7 @@ common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
common-version = { path = "src/common/version" }
common-wal = { path = "src/common/wal" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }

View File

@@ -88,7 +88,25 @@ enable = true
# - "kafka"
provider = "raft_engine"
# There're none raft-engine wal config since meta srv only involves in remote wal currently.
# Raft-engine wal options.
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "256MB"
# WAL purge threshold.
purge_threshold = "4GB"
# WAL purge interval in seconds.
purge_interval = "10m"
# WAL read batch size.
read_batch_size = 128
# Whether to sync log file after every write.
sync_write = false
# Whether to reuse logically truncated log files.
enable_log_recycle = true
# Whether to pre-create log files on start up
prefill_log_files = false
# Duration for fsyncing log files.
sync_period = "1000ms"
# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default.
@@ -125,25 +143,6 @@ provider = "raft_engine"
# The deadline of retries.
# backoff_deadline = "5mins"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "256MB"
# WAL purge threshold.
purge_threshold = "4GB"
# WAL purge interval in seconds.
purge_interval = "10m"
# WAL read batch size.
read_batch_size = 128
# Whether to sync log file after every write.
sync_write = false
# Whether to reuse logically truncated log files.
enable_log_recycle = true
# Whether to pre-create log files on start up
prefill_log_files = false
# Duration for fsyncing log files.
sync_period = "1000ms"
# Metadata storage options.
[metadata_store]
# Kv file size in bytes.

View File

@@ -34,6 +34,7 @@ common-telemetry = { workspace = true, features = [
"deadlock_detection",
] }
common-time.workspace = true
common-wal.workspace = true
config = "0.13"
datanode.workspace = true
datatypes.workspace = true

View File

@@ -18,8 +18,8 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_config::WalConfig;
use common_telemetry::{info, logging};
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
@@ -174,7 +174,7 @@ impl StartCommand {
// `wal_dir` only affects raft-engine config.
if let Some(wal_dir) = &self.wal_dir
&& let WalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
&& let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
{
if raft_engine_config
.dir
@@ -316,7 +316,7 @@ mod tests {
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
let WalConfig::RaftEngine(raft_engine_config) = options.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
unreachable!()
};
assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
@@ -504,7 +504,7 @@ mod tests {
};
// Should be read from env, env > default values.
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.read_batch_size, 100);

View File

@@ -14,8 +14,8 @@
use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_meta::wal::WalConfig as MetaSrvWalConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::MetaSrvWalConfig;
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
use frontend::error::{Result as FeResult, TomlFormatSnafu};
@@ -173,8 +173,8 @@ impl Options {
mod tests {
use std::io::Write;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use super::*;
@@ -281,7 +281,7 @@ mod tests {
);
// Should be the values from config file, not environment variables.
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");

View File

@@ -18,7 +18,6 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
@@ -29,11 +28,12 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use common_time::timezone::set_default_timezone;
use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
@@ -497,8 +497,8 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
@@ -605,7 +605,7 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());

View File

@@ -8,9 +8,5 @@ license.workspace = true
common-base.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
sysinfo.workspace = true
toml.workspace = true

View File

@@ -13,13 +13,10 @@
// limitations under the License.
pub mod utils;
pub mod wal;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
pub use crate::wal::{KafkaWalOptions, WalConfig, WalOptions, WAL_OPTIONS_KEY};
pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
}

View File

@@ -1,154 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig};
pub use crate::wal::raft_engine::RaftEngineConfig;
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
/// and inserted into the options of a `RegionCreateRequest`.
pub const WAL_OPTIONS_KEY: &str = "wal_options";
/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
RaftEngine(RaftEngineConfig),
Kafka(KafkaConfig),
}
impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base),
}
}
}
impl Default for WalConfig {
fn default() -> Self {
WalConfig::RaftEngine(RaftEngineConfig::default())
}
}
/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}
impl Default for StandaloneWalConfig {
fn default() -> Self {
StandaloneWalConfig::RaftEngine(RaftEngineConfig::default())
}
}
/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
RaftEngine,
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
with_prefix!(prefix_wal_kafka "wal.kafka.");
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;
use crate::wal::kafka::KafkaBackoffConfig;
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};
#[test]
fn test_serde_kafka_config() {
// With all fields.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(decoded, expected);
// With some fields missing.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
linger = "200ms"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
linger: Duration::from_millis(200),
..Default::default()
};
assert_eq!(decoded, expected);
}
#[test]
fn test_serde_wal_options() {
// Test serde raft-engine wal options.
let wal_options = WalOptions::RaftEngine;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"raft_engine"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
// Test serde kafka wal options.
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: "test_topic".to_string(),
});
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}

View File

@@ -16,7 +16,6 @@ base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true
common-macro.workspace = true
@@ -25,6 +24,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
@@ -39,18 +39,18 @@ regex.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
serde_with.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tonic.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
common-wal = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
uuid.workspace = true

View File

@@ -48,7 +48,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::prepare_wal_option;
use crate::wal_options_allocator::prepare_wal_options;
pub struct CreateTableProcedure {
pub context: DdlContext,
@@ -455,7 +455,7 @@ impl CreateRequestBuilder {
request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
prepare_wal_option(&mut request.options, region_id, region_wal_options);
prepare_wal_options(&mut request.options, region_id, region_wal_options);
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.

View File

@@ -31,7 +31,7 @@ use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
#[derive(Clone)]
pub struct TableMetadataAllocator {

View File

@@ -516,7 +516,7 @@ mod tests {
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal::WalOptionsAllocator;
use crate::wal_options_allocator::WalOptionsAllocator;
/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;

View File

@@ -14,10 +14,10 @@
use std::str::Utf8Error;
use common_config::wal::WalOptions;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_wal::options::WalOptions;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::{RegionId, RegionNumber};

View File

@@ -37,7 +37,7 @@ pub mod sequence;
pub mod state_store;
pub mod table_name;
pub mod util;
pub mod wal;
pub mod wal_options_allocator;
pub type ClusterId = u64;
pub type DatanodeId = u64;

View File

@@ -1,125 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
pub mod options_allocator;
use std::collections::HashMap;
use common_config::wal::StandaloneWalConfig;
use common_config::WAL_OPTIONS_KEY;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::options_allocator::{
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
};
/// Wal config for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
#[default]
RaftEngine,
Kafka(KafkaConfig),
}
impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(_) => WalConfig::RaftEngine,
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig {
broker_endpoints: config.base.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.base.backoff,
}),
}
}
}
pub fn prepare_wal_option(
options: &mut HashMap<String, String>,
region_id: RegionId,
region_wal_options: &HashMap<RegionNumber, String>,
) {
if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_config::wal::kafka::{KafkaBackoffConfig, TopicSelectorType};
use super::*;
#[test]
fn test_serde_wal_config() {
// Test serde raft-engine wal config with none other wal config.
let toml_str = r#"
provider = "raft_engine"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(wal_config, WalConfig::RaftEngine);
// Test serde raft-engine wal config with extra other wal config.
let toml_str = r#"
provider = "raft_engine"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(wal_config, WalConfig::RaftEngine);
// Test serde kafka wal config.
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
let expected_kafka_config = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}
}

View File

@@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
use std::collections::HashMap;
use std::sync::Arc;
use common_config::{KafkaWalOptions, WalOptions};
use common_wal::config::MetaSrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::TopicManager as KafkaTopicManager;
use crate::wal::WalConfig;
use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager;
/// Allocates wal options in region granularity.
#[derive(Default)]
@@ -37,10 +39,10 @@ pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: WalConfig, kv_backend: KvBackendRef) -> Self {
pub fn new(config: MetaSrvWalConfig, kv_backend: KvBackendRef) -> Self {
match config {
WalConfig::RaftEngine => Self::RaftEngine,
WalConfig::Kafka(kafka_config) => {
MetaSrvWalConfig::RaftEngine => Self::RaftEngine,
MetaSrvWalConfig::Kafka(kafka_config) => {
Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend))
}
}
@@ -103,19 +105,31 @@ pub fn allocate_region_wal_options(
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Inserts wal options into options.
pub fn prepare_wal_options(
options: &mut HashMap<String, String>,
region_id: RegionId,
region_wal_options: &HashMap<RegionNumber, String>,
) {
if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::MetaSrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal::kafka::test_util::run_test_with_kafka_wal;
use crate::wal::kafka::topic_selector::RoundRobinTopicSelector;
use crate::wal::kafka::KafkaConfig;
use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector;
// Tests the wal options allocator could successfully allocate raft-engine wal options.
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = WalConfig::RaftEngine;
let wal_config = MetaSrvWalConfig::RaftEngine;
let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
allocator.start().await.unwrap();
@@ -141,7 +155,7 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = KafkaConfig {
let config = MetaSrvKafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod topic_manager;
pub mod topic_selector;

View File

@@ -15,8 +15,9 @@
use std::collections::HashSet;
use std::sync::Arc;
use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::{error, info};
use common_wal::config::kafka::MetaSrvKafkaConfig;
use common_wal::TopicSelectorType;
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -33,8 +34,9 @@ use crate::error::{
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal::kafka::KafkaConfig;
use crate::wal_options_allocator::kafka::topic_selector::{
RoundRobinTopicSelector, TopicSelectorRef,
};
const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/";
@@ -44,7 +46,7 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
config: MetaSrvKafkaConfig,
pub(crate) topic_pool: Vec<String>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
@@ -52,7 +54,7 @@ pub struct TopicManager {
impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: KafkaConfig, kv_backend: KvBackendRef) -> Self {
pub fn new(config: MetaSrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
@@ -237,9 +239,10 @@ impl TopicManager {
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal::kafka::test_util::run_test_with_kafka_wal;
// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
@@ -277,7 +280,7 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = KafkaConfig {
let config = MetaSrvKafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()

21
src/common/wal/Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "common-wal"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
testing = []
[dependencies]
common-base.workspace = true
common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_with.workspace = true
[dev-dependencies]
serde_json.workspace = true
toml.workspace = true

View File

@@ -0,0 +1,228 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
pub mod raft_engine;
use serde::{Deserialize, Serialize};
use crate::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum MetaSrvWalConfig {
#[default]
RaftEngine,
Kafka(MetaSrvKafkaConfig),
}
/// Wal configurations for datanode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum DatanodeWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(DatanodeKafkaConfig),
}
impl Default for DatanodeWalConfig {
fn default() -> Self {
Self::RaftEngine(RaftEngineConfig::default())
}
}
/// Wal configurations for standalone.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}
impl Default for StandaloneWalConfig {
fn default() -> Self {
Self::RaftEngine(RaftEngineConfig::default())
}
}
impl From<StandaloneWalConfig> for MetaSrvWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine,
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetaSrvKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
}),
}
}
}
impl From<StandaloneWalConfig> for DatanodeWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_size: config.max_batch_size,
linger: config.linger,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
}),
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig};
use crate::TopicSelectorType;
#[test]
fn test_toml_raft_engine() {
// With none configs.
let toml_str = r#"
provider = "raft_engine"
"#;
let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(metasrv_wal_config, MetaSrvWalConfig::RaftEngine);
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(
datanode_wal_config,
DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
);
// With useless configs.
let toml_str = r#"
provider = "raft_engine"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
"#;
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(
datanode_wal_config,
DatanodeWalConfig::RaftEngine(RaftEngineConfig::default())
);
// With some useful configs.
let toml_str = r#"
provider = "raft_engine"
file_size = "4MB"
purge_threshold = "1GB"
purge_interval = "5mins"
"#;
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = RaftEngineConfig {
file_size: ReadableSize::mb(4),
purge_threshold: ReadableSize::gb(1),
purge_interval: Duration::from_secs(5 * 60),
..Default::default()
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
}
#[test]
fn test_toml_kafka() {
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_size = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
// Deserialized to MetaSrvWalConfig.
let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetaSrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected));
// Deserialized to DatanodeWalConfig.
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
// Deserialized to StandaloneWalConfig.
let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap();
let expected = StandaloneKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected));
}
}

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod common;
pub mod datanode;
pub mod metasrv;
pub mod standalone;
pub use datanode::DatanodeKafkaConfig;
pub use metasrv::MetaSrvKafkaConfig;
pub use standalone::StandaloneKafkaConfig;

View File

@@ -0,0 +1,48 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
with_prefix!(pub backoff_prefix "backoff_");
/// Backoff configurations for kafka clients.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct BackoffConfig {
/// The initial backoff delay.
#[serde(with = "humantime_serde")]
pub init: Duration,
/// The maximum backoff delay.
#[serde(with = "humantime_serde")]
pub max: Duration,
/// The exponential backoff rate, i.e. next backoff = base * current backoff.
pub base: u32,
/// The deadline of retries. `None` stands for no deadline.
#[serde(with = "humantime_serde")]
pub deadline: Option<Duration>,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
use crate::BROKER_ENDPOINT;
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
}
impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
}
}
}

View File

@@ -12,31 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod topic_manager;
pub mod topic_selector;
use std::time::Duration;
use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
use serde::{Deserialize, Serialize};
pub use crate::wal::kafka::topic_manager::TopicManager;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
/// Configurations for kafka wal.
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct KafkaConfig {
pub struct MetaSrvKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// Number of topics to be created upon start.
/// The number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
/// The number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
@@ -44,24 +39,23 @@ pub struct KafkaConfig {
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "kafka_backoff")]
pub backoff: KafkaBackoffConfig,
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
}
impl Default for KafkaConfig {
impl Default for MetaSrvKafkaConfig {
fn default() -> Self {
let broker_endpoints = vec!["127.0.0.1:9092".to_string()];
let broker_endpoints = vec![BROKER_ENDPOINT.to_string()];
let replication_factor = broker_endpoints.len() as i16;
Self {
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig::default(),
backoff: BackoffConfig::default(),
}
}
}

View File

@@ -15,88 +15,18 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TopicSelectorType {
#[default]
RoundRobin,
}
/// Configurations for kafka wal.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress log entries.
#[serde(skip)]
pub compression: RsKafkaCompression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "kafka_backoff")]
pub backoff: KafkaBackoffConfig,
}
impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),
}
}
}
with_prefix!(pub kafka_backoff "backoff_");
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaBackoffConfig {
/// The initial backoff delay.
#[serde(with = "humantime_serde")]
pub init: Duration,
/// The maximum backoff delay.
#[serde(with = "humantime_serde")]
pub max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
pub base: u32,
/// The deadline of retries. `None` stands for no deadline.
#[serde(with = "humantime_serde")]
pub deadline: Option<Duration>,
}
impl Default for KafkaBackoffConfig {
fn default() -> Self {
Self {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
/// Kafka wal configurations for standalone.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct StandaloneKafkaConfig {
#[serde(flatten)]
pub base: KafkaConfig,
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// Number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
@@ -110,28 +40,40 @@ pub struct StandaloneKafkaConfig {
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
}
impl Default for StandaloneKafkaConfig {
fn default() -> Self {
let base = KafkaConfig::default();
let replication_factor = base.broker_endpoints.len() as i16;
let broker_endpoints = vec![BROKER_ENDPOINT.to_string()];
let replication_factor = broker_endpoints.len() as i16;
Self {
base,
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
}
}
}
/// Kafka wal options allocated to a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaOptions {
/// Kafka wal topic.
pub topic: String,
}

View File

@@ -21,24 +21,24 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct RaftEngineConfig {
// wal directory
/// Wal directory
pub dir: Option<String>,
// wal file size in bytes
/// Wal file size in bytes
pub file_size: ReadableSize,
// wal purge threshold in bytes
/// Wal purge threshold in bytes
pub purge_threshold: ReadableSize,
// purge interval in seconds
/// Purge interval in seconds
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
// read batch size
/// Read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
/// Whether to sync log file after every write
pub sync_write: bool,
// whether to reuse logically truncated log files.
/// Whether to reuse logically truncated log files.
pub enable_log_recycle: bool,
// whether to pre-create log files on start up
/// Whether to pre-create log files on start up
pub prefill_log_files: bool,
// duration for fsyncing log files.
/// Duration for fsyncing log files.
#[serde(with = "humantime_serde")]
pub sync_period: Option<Duration>,
}

32
src/common/wal/src/lib.rs Normal file
View File

@@ -0,0 +1,32 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
pub mod config;
pub mod options;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092";
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// The type of the topic selector, i.e. with which strategy to select a topic.
// The enum is defined here to work around cyclic dependency issues.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TopicSelectorType {
#[default]
RoundRobin,
}

View File

@@ -0,0 +1,66 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
pub use crate::options::kafka::KafkaWalOptions;
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
/// and inserted into the options of a `RegionCreateRequest`.
pub const WAL_OPTIONS_KEY: &str = "wal_options";
/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
RaftEngine,
#[serde(with = "kafka_prefix")]
Kafka(KafkaWalOptions),
}
with_prefix!(kafka_prefix "wal.kafka.");
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serde_wal_options() {
// Test serde raft-engine wal options.
let wal_options = WalOptions::RaftEngine;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"raft_engine"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
// Test serde kafka wal options.
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: "test_topic".to_string(),
});
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}

View File

@@ -12,24 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use serde::{Deserialize, Serialize};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let Ok(endpoints) = env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test");
return;
};
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
test(endpoints).await
/// Kafka wal options allocated to a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaWalOptions {
/// Kafka wal topic.
pub topic: String,
}

View File

@@ -35,6 +35,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -17,12 +17,12 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::DatanodeWalConfig;
use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
@@ -237,7 +237,7 @@ pub struct DatanodeOptions {
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
pub wal: WalConfig,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
@@ -260,7 +260,7 @@ impl Default for DatanodeOptions {
rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
http: HttpOptions::default(),
meta_client: None,
wal: WalConfig::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),

View File

@@ -20,16 +20,17 @@ use std::sync::Arc;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_config::wal::{KafkaConfig, RaftEngineConfig};
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal::prepare_wal_option;
use common_meta::wal_options_allocator::prepare_wal_options;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
@@ -377,7 +378,7 @@ impl DatanodeBuilder {
config: MitoConfig,
) -> Result<MitoEngine> {
let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
@@ -386,8 +387,7 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
WalConfig::Kafka(kafka_config) => MitoEngine::new(
DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
@@ -427,7 +427,7 @@ impl DatanodeBuilder {
}
/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &KafkaConfig) -> Result<Arc<KafkaLogStore>> {
async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config)
.await
.map_err(Box::new)
@@ -462,7 +462,7 @@ async fn open_all_regions(
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
prepare_wal_option(
prepare_wal_options(
&mut region_options,
RegionId::new(table_value.table_id, region_number),
&table_value.region_info.region_wal_options,

View File

@@ -14,7 +14,7 @@
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal::prepare_wal_option;
use common_meta::wal_options_allocator::prepare_wal_options;
use futures_util::future::BoxFuture;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
@@ -34,7 +34,7 @@ impl HandlerContext {
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
prepare_wal_option(&mut region_options, region_id, &region_wal_options);
prepare_wal_options(&mut region_options, region_id, &region_wal_options);
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),

View File

@@ -23,6 +23,7 @@ common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_config::wal::KafkaConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
@@ -45,7 +45,7 @@ pub(crate) struct Client {
impl Client {
/// Creates a Client from the raw client.
pub(crate) fn new(raw_client: Arc<PartitionClient>, config: &KafkaConfig) -> Self {
pub(crate) fn new(raw_client: Arc<PartitionClient>, config: &DatanodeKafkaConfig) -> Self {
let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize);
let batch_producer = BatchProducerBuilder::new(raw_client.clone())
.with_compression(config.compression)
@@ -62,7 +62,7 @@ impl Client {
/// Manages client construction and accesses.
#[derive(Debug)]
pub(crate) struct ClientManager {
pub(crate) config: KafkaConfig,
pub(crate) config: DatanodeKafkaConfig,
/// Top-level client in kafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
@@ -72,7 +72,7 @@ pub(crate) struct ClientManager {
impl ClientManager {
/// Tries to create a ClientManager.
pub(crate) async fn try_new(config: &KafkaConfig) -> Result<Self> {
pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
@@ -136,7 +136,7 @@ impl ClientManager {
#[cfg(test)]
mod tests {
use common_meta::wal::kafka::test_util::run_test_with_kafka_wal;
use common_wal::test_util::run_test_with_kafka_wal;
use tokio::sync::Barrier;
use super::*;
@@ -155,7 +155,7 @@ mod tests {
)
.await;
let config = KafkaConfig {
let config = DatanodeKafkaConfig {
broker_endpoints,
..Default::default()
};

View File

@@ -15,8 +15,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_config::wal::{KafkaConfig, WalOptions};
use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::options::WalOptions;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
@@ -35,14 +36,14 @@ use crate::kafka::{EntryImpl, NamespaceImpl};
/// A log store backed by Kafka.
#[derive(Debug)]
pub struct KafkaLogStore {
config: KafkaConfig,
config: DatanodeKafkaConfig,
/// Manages kafka clients through which the log store contact the Kafka cluster.
client_manager: ClientManagerRef,
}
impl KafkaLogStore {
/// Tries to create a Kafka log store.
pub async fn try_new(config: &KafkaConfig) -> Result<Self> {
pub async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
Ok(Self {
client_manager: Arc::new(ClientManager::try_new(config).await?),
config: config.clone(),
@@ -315,7 +316,7 @@ mod tests {
)
.await;
let config = KafkaConfig {
let config = DatanodeKafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::kb(32),
..Default::default()

View File

@@ -14,5 +14,3 @@
pub mod offset;
pub mod record;
#[cfg(test)]
mod test_util;

View File

@@ -295,12 +295,12 @@ mod tests {
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use uuid::Uuid;
use super::*;
use crate::kafka::client_manager::ClientManager;
use crate::kafka::util::test_util::run_test_with_kafka_wal;
// Implements some utility methods for testing.
impl Default for Record {
@@ -555,7 +555,7 @@ mod tests {
};
let entry = new_test_entry([b'1'; 2000000], 0, ns.clone());
let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]);
let config = KafkaConfig {
let config = DatanodeKafkaConfig {
broker_endpoints,
max_batch_size: ReadableSize::mb(1),
..Default::default()

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_config::wal::WalOptions;
use common_wal::options::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

View File

@@ -19,9 +19,10 @@ use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use async_stream::stream;
use common_config::wal::{RaftEngineConfig, WalOptions};
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::options::WalOptions;
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Id as EntryId;

View File

@@ -15,7 +15,7 @@
use std::path::Path;
use common_base::readable_size::ReadableSize;
use common_config::wal::RaftEngineConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use crate::raft_engine::log_store::RaftEngineLogStore;

View File

@@ -26,6 +26,7 @@ common-procedure.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
derive_builder.workspace = true

View File

@@ -26,12 +26,12 @@ use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal::options_allocator::WalOptionsAllocatorRef;
use common_meta::wal::WalConfig;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info, warn};
use common_wal::config::MetaSrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
@@ -74,7 +74,7 @@ pub struct MetaSrvOptions {
pub datanode: DatanodeOptions,
pub enable_telemetry: bool,
pub data_home: String,
pub wal: WalConfig,
pub wal: MetaSrvWalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: String,
}
@@ -107,7 +107,7 @@ impl Default for MetaSrvOptions {
datanode: DatanodeOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
wal: MetaSrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
}

View File

@@ -30,7 +30,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal::WalOptionsAllocator;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;

View File

@@ -32,6 +32,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-test-util = { workspace = true, optional = true }
common-time.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
@@ -54,7 +55,7 @@ puffin.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
serde_with = "3"
serde_with.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true

View File

@@ -22,9 +22,9 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use common_config::wal::WalOptions;
use common_telemetry::info;
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

View File

@@ -18,9 +18,9 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;
use common_config::wal::WalOptions;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};

View File

@@ -17,8 +17,7 @@
use std::collections::HashMap;
use std::time::Duration;
use common_config::wal::WalOptions;
use common_config::WAL_OPTIONS_KEY;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use serde::Deserialize;
use serde_json::Value;
use serde_with::{serde_as, with_prefix, DisplayFromStr};
@@ -174,8 +173,7 @@ fn options_map_to_value(options: &HashMap<String, String>) -> Value {
#[cfg(test)]
mod tests {
use common_config::wal::KafkaWalOptions;
use common_config::WAL_OPTIONS_KEY;
use common_wal::options::KafkaWalOptions;
use super::*;

View File

@@ -16,7 +16,7 @@ use std::mem;
use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry};
use common_config::wal::WalOptions;
use common_wal::options::WalOptions;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};

View File

@@ -20,8 +20,8 @@ use std::sync::Arc;
use api::v1::WalEntry;
use async_stream::try_stream;
use common_config::wal::WalOptions;
use common_error::ext::BoxedError;
use common_wal::options::WalOptions;
use futures::stream::BoxStream;
use futures::StreamExt;
use prost::Message;

View File

@@ -10,12 +10,12 @@ aquamarine.workspace = true
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
common-wal.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true

View File

@@ -16,8 +16,8 @@
use std::collections::HashMap;
use common_config::wal::WalOptions;
use common_error::ext::ErrorExt;
use common_wal::options::WalOptions;
use crate::logstore::entry::Entry;
pub use crate::logstore::entry::Id as EntryId;

View File

@@ -30,6 +30,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv = "0.15"

View File

@@ -24,7 +24,6 @@ use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_config::WalConfig;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
@@ -33,10 +32,10 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
@@ -77,8 +76,8 @@ pub struct GreptimeDbClusterBuilder {
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
wal_config: DatanodeWalConfig,
meta_wal_config: MetaSrvWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
@@ -106,8 +105,8 @@ impl GreptimeDbClusterBuilder {
store_config: None,
store_providers: None,
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
wal_config: DatanodeWalConfig::default(),
meta_wal_config: MetaSrvWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
@@ -132,13 +131,13 @@ impl GreptimeDbClusterBuilder {
}
#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
self.wal_config = wal_config;
self
}
#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}

View File

@@ -17,16 +17,17 @@ use std::sync::Arc;
use cmd::options::MixOptions;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{KvBackendConfig, WalConfig};
use common_config::KvBackendConfig;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{WalConfig as MetaWalConfig, WalOptionsAllocator};
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
@@ -46,8 +47,8 @@ pub struct GreptimeDbStandalone {
#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
wal_config: DatanodeWalConfig,
meta_wal_config: MetaSrvWalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
@@ -60,8 +61,8 @@ impl GreptimeDbStandaloneBuilder {
store_providers: None,
plugin: None,
default_store: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
wal_config: DatanodeWalConfig::default(),
meta_wal_config: MetaSrvWalConfig::default(),
}
}
@@ -92,13 +93,13 @@ impl GreptimeDbStandaloneBuilder {
}
#[must_use]
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
self.wal_config = wal_config;
self
}
#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}

View File

@@ -21,13 +21,13 @@ use std::time::Duration;
use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use common_config::WalConfig;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
StorageConfig,
@@ -302,7 +302,7 @@ pub fn create_tmp_dir_and_datanode_opts(
default_store_type: StorageType,
store_provider_types: Vec<StorageType>,
name: &str,
wal_config: WalConfig,
wal_config: DatanodeWalConfig,
) -> (DatanodeOptions, TestGuard) {
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
@@ -336,7 +336,7 @@ pub(crate) fn create_datanode_opts(
default_store: ObjectStoreConfig,
providers: Vec<ObjectStoreConfig>,
home_dir: String,
wal_config: WalConfig,
wal_config: DatanodeWalConfig,
) -> DatanodeOptions {
DatanodeOptions {
node_id: Some(0),

View File

@@ -15,14 +15,12 @@
use std::env;
use std::sync::Arc;
use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use frontend::instance::Instance;
use rstest_reuse::{self, template};
@@ -163,11 +161,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMoc
.collect::<Vec<_>>();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbStandaloneBuilder::new(&test_name)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
@@ -193,11 +191,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,

View File

@@ -16,16 +16,14 @@ use std::sync::Arc;
use std::time::Duration;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_meta::key::{RegionDistribution, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::future::BoxFuture;
@@ -108,12 +106,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -238,12 +236,12 @@ pub async fn test_region_migration_multiple_regions(
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -375,12 +373,12 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -507,12 +505,12 @@ pub async fn test_region_migration_incorrect_from_peer(
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
@@ -582,12 +580,12 @@ pub async fn test_region_migration_incorrect_region_id(
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),