diff --git a/Cargo.lock b/Cargo.lock index de440ec733..996469f1db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1539,6 +1539,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "common-wal", "config", "datanode", "datatypes", @@ -1632,12 +1633,8 @@ dependencies = [ "common-base", "humantime-serde", "num_cpus", - "rskafka", "serde", - "serde_json", - "serde_with", "sysinfo", - "toml 0.8.8", ] [[package]] @@ -1825,7 +1822,6 @@ dependencies = [ "bytes", "chrono", "common-catalog", - "common-config", "common-error", "common-grpc-expr", "common-macro", @@ -1834,6 +1830,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-time", + "common-wal", "datatypes", "derive_builder 0.12.0", "etcd-client", @@ -1855,7 +1852,6 @@ dependencies = [ "strum 0.25.0", "table", "tokio", - "toml 0.8.8", "tonic 0.10.2", "uuid", ] @@ -2014,6 +2010,21 @@ dependencies = [ "build-data", ] +[[package]] +name = "common-wal" +version = "0.6.0" +dependencies = [ + "common-base", + "common-telemetry", + "futures-util", + "humantime-serde", + "rskafka", + "serde", + "serde_json", + "serde_with", + "toml 0.8.8", +] + [[package]] name = "concurrent-queue" version = "2.4.0" @@ -2670,6 +2681,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "common-wal", "dashmap", "datafusion", "datafusion-common", @@ -4519,6 +4531,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "common-wal", "dashmap", "futures", "futures-util", @@ -4834,6 +4847,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-time", + "common-wal", "dashmap", "datatypes", "derive_builder 0.12.0", @@ -4988,6 +5002,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "common-wal", "dashmap", "datafusion", "datafusion-common", @@ -9106,12 +9121,12 @@ dependencies = [ "async-trait", "bytes", "common-base", - "common-config", "common-error", "common-macro", "common-query", "common-recordbatch", "common-time", + "common-wal", "datatypes", "derive_builder 0.12.0", "futures", @@ -9544,6 +9559,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-wal", "datafusion", "datafusion-expr", "datanode", diff --git a/Cargo.toml b/Cargo.toml index b5e5331073..dfa339b4b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "src/common/time", "src/common/decimal", "src/common/version", + "src/common/wal", "src/datanode", "src/datatypes", "src/file-engine", @@ -124,6 +125,7 @@ rskafka = "0.5" rust_decimal = "1.33" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_with = "3" smallvec = { version = "1", features = ["serde"] } snafu = "0.7" sysinfo = "0.30" @@ -169,6 +171,7 @@ common-telemetry = { path = "src/common/telemetry" } common-test-util = { path = "src/common/test-util" } common-time = { path = "src/common/time" } common-version = { path = "src/common/version" } +common-wal = { path = "src/common/wal" } datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } diff --git a/config/standalone.example.toml b/config/standalone.example.toml index a49ffa835e..f3bb54aa2a 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -88,7 +88,25 @@ enable = true # - "kafka" provider = "raft_engine" -# There're none raft-engine wal config since meta srv only involves in remote wal currently. +# Raft-engine wal options. +# WAL data directory +# dir = "/tmp/greptimedb/wal" +# WAL file size in bytes. +file_size = "256MB" +# WAL purge threshold. +purge_threshold = "4GB" +# WAL purge interval in seconds. +purge_interval = "10m" +# WAL read batch size. +read_batch_size = 128 +# Whether to sync log file after every write. +sync_write = false +# Whether to reuse logically truncated log files. +enable_log_recycle = true +# Whether to pre-create log files on start up +prefill_log_files = false +# Duration for fsyncing log files. +sync_period = "1000ms" # Kafka wal options. # The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. @@ -125,25 +143,6 @@ provider = "raft_engine" # The deadline of retries. # backoff_deadline = "5mins" -# WAL data directory -# dir = "/tmp/greptimedb/wal" -# WAL file size in bytes. -file_size = "256MB" -# WAL purge threshold. -purge_threshold = "4GB" -# WAL purge interval in seconds. -purge_interval = "10m" -# WAL read batch size. -read_batch_size = 128 -# Whether to sync log file after every write. -sync_write = false -# Whether to reuse logically truncated log files. -enable_log_recycle = true -# Whether to pre-create log files on start up -prefill_log_files = false -# Duration for fsyncing log files. -sync_period = "1000ms" - # Metadata storage options. [metadata_store] # Kv file size in bytes. diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 61571bbfcc..01fa554cb5 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -34,6 +34,7 @@ common-telemetry = { workspace = true, features = [ "deadlock_detection", ] } common-time.workspace = true +common-wal.workspace = true config = "0.13" datanode.workspace = true datatypes.workspace = true diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 24f70862e2..1da23c801f 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -18,8 +18,8 @@ use std::time::Duration; use async_trait::async_trait; use catalog::kvbackend::MetaKvBackend; use clap::Parser; -use common_config::WalConfig; use common_telemetry::{info, logging}; +use common_wal::config::DatanodeWalConfig; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::service::DatanodeServiceBuilder; @@ -174,7 +174,7 @@ impl StartCommand { // `wal_dir` only affects raft-engine config. if let Some(wal_dir) = &self.wal_dir - && let WalConfig::RaftEngine(raft_engine_config) = &mut opts.wal + && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal { if raft_engine_config .dir @@ -316,7 +316,7 @@ mod tests { assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); - let WalConfig::RaftEngine(raft_engine_config) = options.wal else { + let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else { unreachable!() }; assert_eq!("/other/wal", raft_engine_config.dir.unwrap()); @@ -504,7 +504,7 @@ mod tests { }; // Should be read from env, env > default values. - let WalConfig::RaftEngine(raft_engine_config) = opts.wal else { + let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { unreachable!() }; assert_eq!(raft_engine_config.read_batch_size, 100); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9270d8d52a..9bc652d44a 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -14,8 +14,8 @@ use clap::ArgMatches; use common_config::KvBackendConfig; -use common_meta::wal::WalConfig as MetaSrvWalConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; +use common_wal::config::MetaSrvWalConfig; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; use frontend::error::{Result as FeResult, TomlFormatSnafu}; @@ -173,8 +173,8 @@ impl Options { mod tests { use std::io::Write; - use common_config::WalConfig; use common_test_util::temp_dir::create_named_temp_file; + use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use super::*; @@ -281,7 +281,7 @@ mod tests { ); // Should be the values from config file, not environment variables. - let WalConfig::RaftEngine(raft_engine_config) = opts.wal else { + let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { unreachable!() }; assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal"); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8df0158f54..f9a49e75bd 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,6 @@ use std::{fs, path}; use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::wal::StandaloneWalConfig; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; @@ -29,11 +28,12 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef}; +use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use common_time::timezone::set_default_timezone; +use common_wal::config::StandaloneWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; @@ -497,8 +497,8 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; - use common_config::WalConfig; use common_test_util::temp_dir::create_named_temp_file; + use common_wal::config::DatanodeWalConfig; use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; @@ -605,7 +605,7 @@ mod tests { assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); - let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { + let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { unreachable!() }; assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap()); diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index 12029609e1..829c2ee53b 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -8,9 +8,5 @@ license.workspace = true common-base.workspace = true humantime-serde.workspace = true num_cpus.workspace = true -rskafka.workspace = true serde.workspace = true -serde_json.workspace = true -serde_with = "3" sysinfo.workspace = true -toml.workspace = true diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 6ad928dc3c..59a68ba768 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -13,13 +13,10 @@ // limitations under the License. pub mod utils; -pub mod wal; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -pub use crate::wal::{KafkaWalOptions, WalConfig, WalOptions, WAL_OPTIONS_KEY}; - pub fn metadata_store_dir(store_dir: &str) -> String { format!("{store_dir}/metadata") } diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs deleted file mode 100644 index f2d3d2c286..0000000000 --- a/src/common/config/src/wal.rs +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod kafka; -pub mod raft_engine; - -use serde::{Deserialize, Serialize}; -use serde_with::with_prefix; - -pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig}; -pub use crate::wal::raft_engine::RaftEngineConfig; - -/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair -/// and inserted into the options of a `RegionCreateRequest`. -pub const WAL_OPTIONS_KEY: &str = "wal_options"; - -/// Wal config for datanode. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum WalConfig { - RaftEngine(RaftEngineConfig), - Kafka(KafkaConfig), -} - -impl From for WalConfig { - fn from(value: StandaloneWalConfig) -> Self { - match value { - StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config), - StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base), - } - } -} - -impl Default for WalConfig { - fn default() -> Self { - WalConfig::RaftEngine(RaftEngineConfig::default()) - } -} - -/// Wal config for datanode. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum StandaloneWalConfig { - RaftEngine(RaftEngineConfig), - Kafka(StandaloneKafkaConfig), -} - -impl Default for StandaloneWalConfig { - fn default() -> Self { - StandaloneWalConfig::RaftEngine(RaftEngineConfig::default()) - } -} - -/// Wal options allocated to a region. -/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded -/// by datanode with `serde_json::from_str`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -#[serde(tag = "wal.provider", rename_all = "snake_case")] -pub enum WalOptions { - #[default] - RaftEngine, - #[serde(with = "prefix_wal_kafka")] - Kafka(KafkaWalOptions), -} - -with_prefix!(prefix_wal_kafka "wal.kafka."); - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use common_base::readable_size::ReadableSize; - use rskafka::client::partition::Compression as RsKafkaCompression; - - use crate::wal::kafka::KafkaBackoffConfig; - use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions}; - - #[test] - fn test_serde_kafka_config() { - // With all fields. - let toml_str = r#" - broker_endpoints = ["127.0.0.1:9092"] - max_batch_size = "1MB" - linger = "200ms" - consumer_wait_timeout = "100ms" - backoff_init = "500ms" - backoff_max = "10s" - backoff_base = 2 - backoff_deadline = "5mins" - "#; - let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); - let expected = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: RsKafkaCompression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: KafkaBackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - }; - assert_eq!(decoded, expected); - - // With some fields missing. - let toml_str = r#" - broker_endpoints = ["127.0.0.1:9092"] - linger = "200ms" - "#; - let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); - let expected = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - linger: Duration::from_millis(200), - ..Default::default() - }; - assert_eq!(decoded, expected); - } - - #[test] - fn test_serde_wal_options() { - // Test serde raft-engine wal options. - let wal_options = WalOptions::RaftEngine; - let encoded = serde_json::to_string(&wal_options).unwrap(); - let expected = r#"{"wal.provider":"raft_engine"}"#; - assert_eq!(&encoded, expected); - - let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); - assert_eq!(decoded, wal_options); - - // Test serde kafka wal options. - let wal_options = WalOptions::Kafka(KafkaWalOptions { - topic: "test_topic".to_string(), - }); - let encoded = serde_json::to_string(&wal_options).unwrap(); - let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; - assert_eq!(&encoded, expected); - - let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); - assert_eq!(decoded, wal_options); - } -} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 609fe259d2..e5985197de 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -16,7 +16,6 @@ base64.workspace = true bytes.workspace = true chrono.workspace = true common-catalog.workspace = true -common-config.workspace = true common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true @@ -25,6 +24,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-wal.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true @@ -39,18 +39,18 @@ regex.workspace = true rskafka.workspace = true serde.workspace = true serde_json.workspace = true -serde_with = "3" +serde_with.workspace = true snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true -toml.workspace = true tonic.workspace = true [dev-dependencies] chrono.workspace = true common-procedure = { workspace = true, features = ["testing"] } +common-wal = { workspace = true, features = ["testing"] } datatypes.workspace = true hyper = { version = "0.14", features = ["full"] } uuid.workspace = true diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index a846ee4618..110ccbeb65 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -48,7 +48,7 @@ use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; -use crate::wal::prepare_wal_option; +use crate::wal_options_allocator::prepare_wal_options; pub struct CreateTableProcedure { pub context: DdlContext, @@ -455,7 +455,7 @@ impl CreateRequestBuilder { request.region_id = region_id.as_u64(); request.path = storage_path; // Stores the encoded wal options into the request options. - prepare_wal_option(&mut request.options, region_id, region_wal_options); + prepare_wal_options(&mut request.options, region_id, region_wal_options); if let Some(physical_table_id) = self.physical_table_id { // Logical table has the same region numbers with physical table, and they have a one-to-one mapping. diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 0b7ba059ad..ed256f85a5 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -31,7 +31,7 @@ use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{Region, RegionRoute}; use crate::sequence::SequenceRef; -use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef}; +use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef}; #[derive(Clone)] pub struct TableMetadataAllocator { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index f06f3583c2..655d66126b 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -516,7 +516,7 @@ mod tests { use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; - use crate::wal::WalOptionsAllocator; + use crate::wal_options_allocator::WalOptionsAllocator; /// A dummy implemented [DatanodeManager]. pub struct DummyDatanodeManager; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 2a0db2abbb..9da624fab9 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -14,10 +14,10 @@ use std::str::Utf8Error; -use common_config::wal::WalOptions; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_wal::options::WalOptions; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 2ee47e8975..b990f45621 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -37,7 +37,7 @@ pub mod sequence; pub mod state_store; pub mod table_name; pub mod util; -pub mod wal; +pub mod wal_options_allocator; pub type ClusterId = u64; pub type DatanodeId = u64; diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs deleted file mode 100644 index 4d02395dab..0000000000 --- a/src/common/meta/src/wal.rs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod kafka; -pub mod options_allocator; - -use std::collections::HashMap; - -use common_config::wal::StandaloneWalConfig; -use common_config::WAL_OPTIONS_KEY; -use serde::{Deserialize, Serialize}; -use store_api::storage::{RegionId, RegionNumber}; - -use crate::wal::kafka::KafkaConfig; -pub use crate::wal::options_allocator::{ - allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef, -}; - -/// Wal config for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum WalConfig { - #[default] - RaftEngine, - Kafka(KafkaConfig), -} - -impl From for WalConfig { - fn from(value: StandaloneWalConfig) -> Self { - match value { - StandaloneWalConfig::RaftEngine(_) => WalConfig::RaftEngine, - StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig { - broker_endpoints: config.base.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, - backoff: config.base.backoff, - }), - } - } -} - -pub fn prepare_wal_option( - options: &mut HashMap, - region_id: RegionId, - region_wal_options: &HashMap, -) { - if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) { - options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()); - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use common_config::wal::kafka::{KafkaBackoffConfig, TopicSelectorType}; - - use super::*; - - #[test] - fn test_serde_wal_config() { - // Test serde raft-engine wal config with none other wal config. - let toml_str = r#" - provider = "raft_engine" - "#; - let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); - assert_eq!(wal_config, WalConfig::RaftEngine); - - // Test serde raft-engine wal config with extra other wal config. - let toml_str = r#" - provider = "raft_engine" - broker_endpoints = ["127.0.0.1:9092"] - num_topics = 32 - "#; - let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); - assert_eq!(wal_config, WalConfig::RaftEngine); - - // Test serde kafka wal config. - let toml_str = r#" - provider = "kafka" - broker_endpoints = ["127.0.0.1:9092"] - num_topics = 32 - selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_topic" - replication_factor = 1 - create_topic_timeout = "30s" - backoff_init = "500ms" - backoff_max = "10s" - backoff_base = 2 - backoff_deadline = "5mins" - "#; - let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); - let expected_kafka_config = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - backoff: KafkaBackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - }; - assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); - } -} diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs similarity index 83% rename from src/common/meta/src/wal/options_allocator.rs rename to src/common/meta/src/wal_options_allocator.rs index 58724b6457..8116773415 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod kafka; + use std::collections::HashMap; use std::sync::Arc; -use common_config::{KafkaWalOptions, WalOptions}; +use common_wal::config::MetaSrvWalConfig; +use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use snafu::ResultExt; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use crate::error::{EncodeWalOptionsSnafu, Result}; use crate::kv_backend::KvBackendRef; -use crate::wal::kafka::TopicManager as KafkaTopicManager; -use crate::wal::WalConfig; +use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager; /// Allocates wal options in region granularity. #[derive(Default)] @@ -37,10 +39,10 @@ pub type WalOptionsAllocatorRef = Arc; impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. - pub fn new(config: WalConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: MetaSrvWalConfig, kv_backend: KvBackendRef) -> Self { match config { - WalConfig::RaftEngine => Self::RaftEngine, - WalConfig::Kafka(kafka_config) => { + MetaSrvWalConfig::RaftEngine => Self::RaftEngine, + MetaSrvWalConfig::Kafka(kafka_config) => { Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend)) } } @@ -103,19 +105,31 @@ pub fn allocate_region_wal_options( Ok(regions.into_iter().zip(wal_options).collect()) } +/// Inserts wal options into options. +pub fn prepare_wal_options( + options: &mut HashMap, + region_id: RegionId, + region_wal_options: &HashMap, +) { + if let Some(wal_options) = region_wal_options.get(®ion_id.region_number()) { + options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()); + } +} + #[cfg(test)] mod tests { + use common_wal::config::kafka::MetaSrvKafkaConfig; + use common_wal::test_util::run_test_with_kafka_wal; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::wal::kafka::test_util::run_test_with_kafka_wal; - use crate::wal::kafka::topic_selector::RoundRobinTopicSelector; - use crate::wal::kafka::KafkaConfig; + use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector; - // Tests the wal options allocator could successfully allocate raft-engine wal options. + // Tests that the wal options allocator could successfully allocate raft-engine wal options. #[tokio::test] async fn test_allocator_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let wal_config = WalConfig::RaftEngine; + let wal_config = MetaSrvWalConfig::RaftEngine; let allocator = WalOptionsAllocator::new(wal_config, kv_backend); allocator.start().await.unwrap(); @@ -141,7 +155,7 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = KafkaConfig { + let config = MetaSrvKafkaConfig { replication_factor: broker_endpoints.len() as i16, broker_endpoints, ..Default::default() diff --git a/src/common/meta/src/wal_options_allocator/kafka.rs b/src/common/meta/src/wal_options_allocator/kafka.rs new file mode 100644 index 0000000000..cc454a3c7d --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/kafka.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod topic_manager; +pub mod topic_selector; diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs similarity index 96% rename from src/common/meta/src/wal/kafka/topic_manager.rs rename to src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index eba47f119d..63944be13c 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -15,8 +15,9 @@ use std::collections::HashSet; use std::sync::Arc; -use common_config::wal::kafka::TopicSelectorType; use common_telemetry::{error, info}; +use common_wal::config::kafka::MetaSrvKafkaConfig; +use common_wal::TopicSelectorType; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -33,8 +34,9 @@ use crate::error::{ }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; -use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef}; -use crate::wal::kafka::KafkaConfig; +use crate::wal_options_allocator::kafka::topic_selector::{ + RoundRobinTopicSelector, TopicSelectorRef, +}; const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; @@ -44,7 +46,7 @@ const DEFAULT_PARTITION: i32 = 0; /// Manages topic initialization and selection. pub struct TopicManager { - config: KafkaConfig, + config: MetaSrvKafkaConfig, pub(crate) topic_pool: Vec, pub(crate) topic_selector: TopicSelectorRef, kv_backend: KvBackendRef, @@ -52,7 +54,7 @@ pub struct TopicManager { impl TopicManager { /// Creates a new topic manager. - pub fn new(config: KafkaConfig, kv_backend: KvBackendRef) -> Self { + pub fn new(config: MetaSrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. let topics = (0..config.num_topics) .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) @@ -237,9 +239,10 @@ impl TopicManager { #[cfg(test)] mod tests { + use common_wal::test_util::run_test_with_kafka_wal; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::wal::kafka::test_util::run_test_with_kafka_wal; // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. #[tokio::test] @@ -277,7 +280,7 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = KafkaConfig { + let config = MetaSrvKafkaConfig { replication_factor: broker_endpoints.len() as i16, broker_endpoints, ..Default::default() diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs similarity index 100% rename from src/common/meta/src/wal/kafka/topic_selector.rs rename to src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml new file mode 100644 index 0000000000..b70ef3740e --- /dev/null +++ b/src/common/wal/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "common-wal" +version.workspace = true +edition.workspace = true +license.workspace = true + +[features] +testing = [] + +[dependencies] +common-base.workspace = true +common-telemetry.workspace = true +futures-util.workspace = true +humantime-serde.workspace = true +rskafka.workspace = true +serde.workspace = true +serde_with.workspace = true + +[dev-dependencies] +serde_json.workspace = true +toml.workspace = true diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs new file mode 100644 index 0000000000..a51335c199 --- /dev/null +++ b/src/common/wal/src/config.rs @@ -0,0 +1,228 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; +pub mod raft_engine; + +use serde::{Deserialize, Serialize}; + +use crate::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig}; +use crate::config::raft_engine::RaftEngineConfig; + +/// Wal configurations for metasrv. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[serde(tag = "provider", rename_all = "snake_case")] +pub enum MetaSrvWalConfig { + #[default] + RaftEngine, + Kafka(MetaSrvKafkaConfig), +} + +/// Wal configurations for datanode. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(tag = "provider", rename_all = "snake_case")] +pub enum DatanodeWalConfig { + RaftEngine(RaftEngineConfig), + Kafka(DatanodeKafkaConfig), +} + +impl Default for DatanodeWalConfig { + fn default() -> Self { + Self::RaftEngine(RaftEngineConfig::default()) + } +} + +/// Wal configurations for standalone. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(tag = "provider", rename_all = "snake_case")] +pub enum StandaloneWalConfig { + RaftEngine(RaftEngineConfig), + Kafka(StandaloneKafkaConfig), +} + +impl Default for StandaloneWalConfig { + fn default() -> Self { + Self::RaftEngine(RaftEngineConfig::default()) + } +} + +impl From for MetaSrvWalConfig { + fn from(config: StandaloneWalConfig) -> Self { + match config { + StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine, + StandaloneWalConfig::Kafka(config) => Self::Kafka(MetaSrvKafkaConfig { + broker_endpoints: config.broker_endpoints, + num_topics: config.num_topics, + selector_type: config.selector_type, + topic_name_prefix: config.topic_name_prefix, + num_partitions: config.num_partitions, + replication_factor: config.replication_factor, + create_topic_timeout: config.create_topic_timeout, + backoff: config.backoff, + }), + } + } +} + +impl From for DatanodeWalConfig { + fn from(config: StandaloneWalConfig) -> Self { + match config { + StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config), + StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { + broker_endpoints: config.broker_endpoints, + compression: config.compression, + max_batch_size: config.max_batch_size, + linger: config.linger, + consumer_wait_timeout: config.consumer_wait_timeout, + backoff: config.backoff, + }), + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use common_base::readable_size::ReadableSize; + use rskafka::client::partition::Compression; + + use super::*; + use crate::config::kafka::common::BackoffConfig; + use crate::config::{DatanodeKafkaConfig, MetaSrvKafkaConfig, StandaloneKafkaConfig}; + use crate::TopicSelectorType; + + #[test] + fn test_toml_raft_engine() { + // With none configs. + let toml_str = r#" + provider = "raft_engine" + "#; + let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(metasrv_wal_config, MetaSrvWalConfig::RaftEngine); + + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!( + datanode_wal_config, + DatanodeWalConfig::RaftEngine(RaftEngineConfig::default()) + ); + + // With useless configs. + let toml_str = r#" + provider = "raft_engine" + broker_endpoints = ["127.0.0.1:9092"] + num_topics = 32 + "#; + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!( + datanode_wal_config, + DatanodeWalConfig::RaftEngine(RaftEngineConfig::default()) + ); + + // With some useful configs. + let toml_str = r#" + provider = "raft_engine" + file_size = "4MB" + purge_threshold = "1GB" + purge_interval = "5mins" + "#; + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + let expected = RaftEngineConfig { + file_size: ReadableSize::mb(4), + purge_threshold: ReadableSize::gb(1), + purge_interval: Duration::from_secs(5 * 60), + ..Default::default() + }; + assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected)); + } + + #[test] + fn test_toml_kafka() { + let toml_str = r#" + provider = "kafka" + broker_endpoints = ["127.0.0.1:9092"] + num_topics = 32 + selector_type = "round_robin" + topic_name_prefix = "greptimedb_wal_topic" + replication_factor = 1 + create_topic_timeout = "30s" + max_batch_size = "1MB" + linger = "200ms" + consumer_wait_timeout = "100ms" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2 + backoff_deadline = "5mins" + "#; + + // Deserialized to MetaSrvWalConfig. + let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); + let expected = MetaSrvKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected)); + + // Deserialized to DatanodeWalConfig. + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + let expected = DatanodeKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); + + // Deserialized to StandaloneWalConfig. + let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); + let expected = StandaloneKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + } +} diff --git a/src/common/wal/src/config/kafka.rs b/src/common/wal/src/config/kafka.rs new file mode 100644 index 0000000000..586d2182a0 --- /dev/null +++ b/src/common/wal/src/config/kafka.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod common; +pub mod datanode; +pub mod metasrv; +pub mod standalone; + +pub use datanode::DatanodeKafkaConfig; +pub use metasrv::MetaSrvKafkaConfig; +pub use standalone::StandaloneKafkaConfig; diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs new file mode 100644 index 0000000000..ea708d9615 --- /dev/null +++ b/src/common/wal/src/config/kafka/common.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; + +with_prefix!(pub backoff_prefix "backoff_"); + +/// Backoff configurations for kafka clients. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct BackoffConfig { + /// The initial backoff delay. + #[serde(with = "humantime_serde")] + pub init: Duration, + /// The maximum backoff delay. + #[serde(with = "humantime_serde")] + pub max: Duration, + /// The exponential backoff rate, i.e. next backoff = base * current backoff. + pub base: u32, + /// The deadline of retries. `None` stands for no deadline. + #[serde(with = "humantime_serde")] + pub deadline: Option, +} + +impl Default for BackoffConfig { + fn default() -> Self { + Self { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + } + } +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs new file mode 100644 index 0000000000..b15d13dffc --- /dev/null +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use rskafka::client::partition::Compression; +use serde::{Deserialize, Serialize}; + +use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::BROKER_ENDPOINT; + +/// Kafka wal configurations for datanode. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct DatanodeKafkaConfig { + /// The broker endpoints of the Kafka cluster. + pub broker_endpoints: Vec, + /// The compression algorithm used to compress kafka records. + #[serde(skip)] + pub compression: Compression, + /// The max size of a single producer batch. + pub max_batch_size: ReadableSize, + /// The linger duration of a kafka batch producer. + #[serde(with = "humantime_serde")] + pub linger: Duration, + /// The consumer wait timeout. + #[serde(with = "humantime_serde")] + pub consumer_wait_timeout: Duration, + /// The backoff config. + #[serde(flatten, with = "backoff_prefix")] + pub backoff: BackoffConfig, +} + +impl Default for DatanodeKafkaConfig { + fn default() -> Self { + Self { + broker_endpoints: vec![BROKER_ENDPOINT.to_string()], + compression: Compression::NoCompression, + // Warning: Kafka has a default limit of 1MB per message in a topic. + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig::default(), + } + } +} diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/wal/src/config/kafka/metasrv.rs similarity index 70% rename from src/common/meta/src/wal/kafka.rs rename to src/common/wal/src/config/kafka/metasrv.rs index 921f2942f8..a8989275f4 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -12,31 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(any(test, feature = "testing"))] -pub mod test_util; -pub mod topic_manager; -pub mod topic_selector; - use std::time::Duration; -use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType}; use serde::{Deserialize, Serialize}; -pub use crate::wal::kafka::topic_manager::TopicManager; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; -/// Configurations for kafka wal. +/// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] -pub struct KafkaConfig { +pub struct MetaSrvKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// Number of topics to be created upon start. + /// The number of topics to be created upon start. pub num_topics: usize, /// The type of the topic selector with which to select a topic for a region. pub selector_type: TopicSelectorType, /// Topic name prefix. pub topic_name_prefix: String, - /// Number of partitions per topic. + /// The number of partitions per topic. pub num_partitions: i32, /// The replication factor of each topic. pub replication_factor: i16, @@ -44,24 +39,23 @@ pub struct KafkaConfig { #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, /// The backoff config. - #[serde(flatten, with = "kafka_backoff")] - pub backoff: KafkaBackoffConfig, + #[serde(flatten, with = "backoff_prefix")] + pub backoff: BackoffConfig, } -impl Default for KafkaConfig { +impl Default for MetaSrvKafkaConfig { fn default() -> Self { - let broker_endpoints = vec!["127.0.0.1:9092".to_string()]; + let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; let replication_factor = broker_endpoints.len() as i16; - Self { broker_endpoints, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), - backoff: KafkaBackoffConfig::default(), + backoff: BackoffConfig::default(), } } } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/wal/src/config/kafka/standalone.rs similarity index 51% rename from src/common/config/src/wal/kafka.rs rename to src/common/wal/src/config/kafka/standalone.rs index 998c895db5..3da8fa4980 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -15,88 +15,18 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; -use rskafka::client::partition::Compression as RsKafkaCompression; +use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; -use serde_with::with_prefix; -/// The type of the topic selector, i.e. with which strategy to select a topic. -#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -pub enum TopicSelectorType { - #[default] - RoundRobin, -} - -/// Configurations for kafka wal. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct KafkaConfig { - /// The broker endpoints of the Kafka cluster. - pub broker_endpoints: Vec, - /// The compression algorithm used to compress log entries. - #[serde(skip)] - pub compression: RsKafkaCompression, - /// The max size of a single producer batch. - pub max_batch_size: ReadableSize, - /// The linger duration of a kafka batch producer. - #[serde(with = "humantime_serde")] - pub linger: Duration, - /// The consumer wait timeout. - #[serde(with = "humantime_serde")] - pub consumer_wait_timeout: Duration, - /// The backoff config. - #[serde(flatten, with = "kafka_backoff")] - pub backoff: KafkaBackoffConfig, -} - -impl Default for KafkaConfig { - fn default() -> Self { - Self { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: RsKafkaCompression::NoCompression, - // Warning: Kafka has a default limit of 1MB per message in a topic. - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: KafkaBackoffConfig::default(), - } - } -} - -with_prefix!(pub kafka_backoff "backoff_"); - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct KafkaBackoffConfig { - /// The initial backoff delay. - #[serde(with = "humantime_serde")] - pub init: Duration, - /// The maximum backoff delay. - #[serde(with = "humantime_serde")] - pub max: Duration, - /// Exponential backoff rate, i.e. next backoff = base * current backoff. - pub base: u32, - /// The deadline of retries. `None` stands for no deadline. - #[serde(with = "humantime_serde")] - pub deadline: Option, -} - -impl Default for KafkaBackoffConfig { - fn default() -> Self { - Self { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), // 5 mins - } - } -} +use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; +/// Kafka wal configurations for standalone. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct StandaloneKafkaConfig { - #[serde(flatten)] - pub base: KafkaConfig, + /// The broker endpoints of the Kafka cluster. + pub broker_endpoints: Vec, /// Number of topics to be created upon start. pub num_topics: usize, /// The type of the topic selector with which to select a topic for a region. @@ -110,28 +40,40 @@ pub struct StandaloneKafkaConfig { /// The timeout of topic creation. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, + /// The compression algorithm used to compress kafka records. + #[serde(skip)] + pub compression: Compression, + /// The max size of a single producer batch. + pub max_batch_size: ReadableSize, + /// The linger duration of a kafka batch producer. + #[serde(with = "humantime_serde")] + pub linger: Duration, + /// The consumer wait timeout. + #[serde(with = "humantime_serde")] + pub consumer_wait_timeout: Duration, + /// The backoff config. + #[serde(flatten, with = "backoff_prefix")] + pub backoff: BackoffConfig, } impl Default for StandaloneKafkaConfig { fn default() -> Self { - let base = KafkaConfig::default(); - let replication_factor = base.broker_endpoints.len() as i16; - + let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; + let replication_factor = broker_endpoints.len() as i16; Self { - base, + broker_endpoints, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), + compression: Compression::NoCompression, + // Warning: Kafka has a default limit of 1MB per message in a topic. + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig::default(), } } } - -/// Kafka wal options allocated to a region. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct KafkaOptions { - /// Kafka wal topic. - pub topic: String, -} diff --git a/src/common/config/src/wal/raft_engine.rs b/src/common/wal/src/config/raft_engine.rs similarity index 82% rename from src/common/config/src/wal/raft_engine.rs rename to src/common/wal/src/config/raft_engine.rs index 9ce8ebec40..f54e3f1ba5 100644 --- a/src/common/config/src/wal/raft_engine.rs +++ b/src/common/wal/src/config/raft_engine.rs @@ -21,24 +21,24 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct RaftEngineConfig { - // wal directory + /// Wal directory pub dir: Option, - // wal file size in bytes + /// Wal file size in bytes pub file_size: ReadableSize, - // wal purge threshold in bytes + /// Wal purge threshold in bytes pub purge_threshold: ReadableSize, - // purge interval in seconds + /// Purge interval in seconds #[serde(with = "humantime_serde")] pub purge_interval: Duration, - // read batch size + /// Read batch size pub read_batch_size: usize, - // whether to sync log file after every write + /// Whether to sync log file after every write pub sync_write: bool, - // whether to reuse logically truncated log files. + /// Whether to reuse logically truncated log files. pub enable_log_recycle: bool, - // whether to pre-create log files on start up + /// Whether to pre-create log files on start up pub prefill_log_files: bool, - // duration for fsyncing log files. + /// Duration for fsyncing log files. #[serde(with = "humantime_serde")] pub sync_period: Option, } diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs new file mode 100644 index 0000000000..88d67ee3e0 --- /dev/null +++ b/src/common/wal/src/lib.rs @@ -0,0 +1,32 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +pub mod config; +pub mod options; +#[cfg(any(test, feature = "testing"))] +pub mod test_util; + +pub const BROKER_ENDPOINT: &str = "127.0.0.1:9092"; +pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; + +/// The type of the topic selector, i.e. with which strategy to select a topic. +// The enum is defined here to work around cyclic dependency issues. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TopicSelectorType { + #[default] + RoundRobin, +} diff --git a/src/common/wal/src/options.rs b/src/common/wal/src/options.rs new file mode 100644 index 0000000000..2662e8675b --- /dev/null +++ b/src/common/wal/src/options.rs @@ -0,0 +1,66 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; + +use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; + +pub use crate::options::kafka::KafkaWalOptions; + +/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair +/// and inserted into the options of a `RegionCreateRequest`. +pub const WAL_OPTIONS_KEY: &str = "wal_options"; + +/// Wal options allocated to a region. +/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded +/// by datanode with `serde_json::from_str`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(tag = "wal.provider", rename_all = "snake_case")] +pub enum WalOptions { + #[default] + RaftEngine, + #[serde(with = "kafka_prefix")] + Kafka(KafkaWalOptions), +} + +with_prefix!(kafka_prefix "wal.kafka."); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serde_wal_options() { + // Test serde raft-engine wal options. + let wal_options = WalOptions::RaftEngine; + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"raft_engine"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + + // Test serde kafka wal options. + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }); + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + } +} diff --git a/src/log-store/src/kafka/util/test_util.rs b/src/common/wal/src/options/kafka.rs similarity index 55% rename from src/log-store/src/kafka/util/test_util.rs rename to src/common/wal/src/options/kafka.rs index b7dc3f7512..a931808826 100644 --- a/src/log-store/src/kafka/util/test_util.rs +++ b/src/common/wal/src/options/kafka.rs @@ -12,24 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env; +use serde::{Deserialize, Serialize}; -use common_telemetry::warn; -use futures_util::future::BoxFuture; - -pub async fn run_test_with_kafka_wal(test: F) -where - F: FnOnce(Vec) -> BoxFuture<'static, ()>, -{ - let Ok(endpoints) = env::var("GT_KAFKA_ENDPOINTS") else { - warn!("The endpoints is empty, skipping the test"); - return; - }; - - let endpoints = endpoints - .split(',') - .map(|s| s.trim().to_string()) - .collect::>(); - - test(endpoints).await +/// Kafka wal options allocated to a region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct KafkaWalOptions { + /// Kafka wal topic. + pub topic: String, } diff --git a/src/common/meta/src/wal/kafka/test_util.rs b/src/common/wal/src/test_util.rs similarity index 100% rename from src/common/meta/src/wal/kafka/test_util.rs rename to src/common/wal/src/test_util.rs diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 3bed5bf9c5..b06f5d8d36 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -35,6 +35,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-wal.workspace = true dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index ac8d3a48b6..7b107d7a61 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -17,12 +17,12 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; -use common_config::WalConfig; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; +use common_wal::config::DatanodeWalConfig; use file_engine::config::EngineConfig as FileEngineConfig; use meta_client::MetaClientOptions; use mito2::config::MitoConfig; @@ -237,7 +237,7 @@ pub struct DatanodeOptions { pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, - pub wal: WalConfig, + pub wal: DatanodeWalConfig, pub storage: StorageConfig, /// Options for different store engines. pub region_engine: Vec, @@ -260,7 +260,7 @@ impl Default for DatanodeOptions { rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, http: HttpOptions::default(), meta_client: None, - wal: WalConfig::default(), + wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 6444601309..42496b8458 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -20,16 +20,17 @@ use std::sync::Arc; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; -use common_config::wal::{KafkaConfig, RaftEngineConfig}; -use common_config::WalConfig; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::kv_backend::KvBackendRef; -use common_meta::wal::prepare_wal_option; +use common_meta::wal_options_allocator::prepare_wal_options; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; use common_telemetry::{error, info, warn}; +use common_wal::config::kafka::DatanodeKafkaConfig; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::config::DatanodeWalConfig; use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; @@ -377,7 +378,7 @@ impl DatanodeBuilder { config: MitoConfig, ) -> Result { let mito_engine = match &opts.wal { - WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( + DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( &opts.storage.data_home, config, Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) @@ -386,8 +387,7 @@ impl DatanodeBuilder { ) .await .context(BuildMitoEngineSnafu)?, - - WalConfig::Kafka(kafka_config) => MitoEngine::new( + DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new( &opts.storage.data_home, config, Self::build_kafka_log_store(kafka_config).await?, @@ -427,7 +427,7 @@ impl DatanodeBuilder { } /// Builds [KafkaLogStore]. - async fn build_kafka_log_store(config: &KafkaConfig) -> Result> { + async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result> { KafkaLogStore::try_new(config) .await .map_err(Box::new) @@ -462,7 +462,7 @@ async fn open_all_regions( for region_number in table_value.regions { // Augments region options with wal options if a wal options is provided. let mut region_options = table_value.region_info.region_options.clone(); - prepare_wal_option( + prepare_wal_options( &mut region_options, RegionId::new(table_value.table_id, region_number), &table_value.region_info.region_wal_options, diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index f4823d8d57..a19ae955c6 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,7 +14,7 @@ use common_error::ext::ErrorExt; use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; -use common_meta::wal::prepare_wal_option; +use common_meta::wal_options_allocator::prepare_wal_options; use futures_util::future::BoxFuture; use store_api::path_utils::region_dir; use store_api::region_request::{RegionOpenRequest, RegionRequest}; @@ -34,7 +34,7 @@ impl HandlerContext { ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); - prepare_wal_option(&mut region_options, region_id, ®ion_wal_options); + prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 370fbc8696..4f35fa19ef 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -23,6 +23,7 @@ common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-wal.workspace = true dashmap.workspace = true futures-util.workspace = true futures.workspace = true diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 1d89fe3f43..9e082decb8 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::KafkaConfig; +use common_wal::config::kafka::DatanodeKafkaConfig; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; @@ -45,7 +45,7 @@ pub(crate) struct Client { impl Client { /// Creates a Client from the raw client. - pub(crate) fn new(raw_client: Arc, config: &KafkaConfig) -> Self { + pub(crate) fn new(raw_client: Arc, config: &DatanodeKafkaConfig) -> Self { let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); let batch_producer = BatchProducerBuilder::new(raw_client.clone()) .with_compression(config.compression) @@ -62,7 +62,7 @@ impl Client { /// Manages client construction and accesses. #[derive(Debug)] pub(crate) struct ClientManager { - pub(crate) config: KafkaConfig, + pub(crate) config: DatanodeKafkaConfig, /// Top-level client in kafka. All clients are constructed by this client. client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. @@ -72,7 +72,7 @@ pub(crate) struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. - pub(crate) async fn try_new(config: &KafkaConfig) -> Result { + pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: config.backoff.init, @@ -136,7 +136,7 @@ impl ClientManager { #[cfg(test)] mod tests { - use common_meta::wal::kafka::test_util::run_test_with_kafka_wal; + use common_wal::test_util::run_test_with_kafka_wal; use tokio::sync::Barrier; use super::*; @@ -155,7 +155,7 @@ mod tests { ) .await; - let config = KafkaConfig { + let config = DatanodeKafkaConfig { broker_endpoints, ..Default::default() }; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 80f57ea13c..49b8deb279 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -15,8 +15,9 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::{KafkaConfig, WalOptions}; use common_telemetry::{debug, warn}; +use common_wal::config::kafka::DatanodeKafkaConfig; +use common_wal::options::WalOptions; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; @@ -35,14 +36,14 @@ use crate::kafka::{EntryImpl, NamespaceImpl}; /// A log store backed by Kafka. #[derive(Debug)] pub struct KafkaLogStore { - config: KafkaConfig, + config: DatanodeKafkaConfig, /// Manages kafka clients through which the log store contact the Kafka cluster. client_manager: ClientManagerRef, } impl KafkaLogStore { /// Tries to create a Kafka log store. - pub async fn try_new(config: &KafkaConfig) -> Result { + pub async fn try_new(config: &DatanodeKafkaConfig) -> Result { Ok(Self { client_manager: Arc::new(ClientManager::try_new(config).await?), config: config.clone(), @@ -315,7 +316,7 @@ mod tests { ) .await; - let config = KafkaConfig { + let config = DatanodeKafkaConfig { broker_endpoints, max_batch_size: ReadableSize::kb(32), ..Default::default() diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs index 86243e38d6..61059b1645 100644 --- a/src/log-store/src/kafka/util.rs +++ b/src/log-store/src/kafka/util.rs @@ -14,5 +14,3 @@ pub mod offset; pub mod record; -#[cfg(test)] -mod test_util; diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 9ff9206454..9bc97557ad 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -295,12 +295,12 @@ mod tests { use std::sync::Arc; use common_base::readable_size::ReadableSize; - use common_config::wal::KafkaConfig; + use common_wal::config::kafka::DatanodeKafkaConfig; + use common_wal::test_util::run_test_with_kafka_wal; use uuid::Uuid; use super::*; use crate::kafka::client_manager::ClientManager; - use crate::kafka::util::test_util::run_test_with_kafka_wal; // Implements some utility methods for testing. impl Default for Record { @@ -555,7 +555,7 @@ mod tests { }; let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); - let config = KafkaConfig { + let config = DatanodeKafkaConfig { broker_endpoints, max_batch_size: ReadableSize::mb(1), ..Default::default() diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 48439333ac..1ffada095e 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_config::wal::WalOptions; +use common_wal::options::WalOptions; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 515ce6645f..13a32e6fe7 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -19,9 +19,10 @@ use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use async_stream::stream; -use common_config::wal::{RaftEngineConfig, WalOptions}; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::options::WalOptions; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Id as EntryId; diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index d6836c5c8c..c25b411665 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -15,7 +15,7 @@ use std::path::Path; use common_base::readable_size::ReadableSize; -use common_config::wal::RaftEngineConfig; +use common_wal::config::raft_engine::RaftEngineConfig; use crate::raft_engine::log_store::RaftEngineLogStore; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 62da072a10..dd1db0bf8b 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -26,6 +26,7 @@ common-procedure.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-wal.workspace = true dashmap.workspace = true datatypes.workspace = true derive_builder.workspace = true diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index de8278df1a..70135f34c1 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,12 +26,12 @@ use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; -use common_meta::wal::options_allocator::WalOptionsAllocatorRef; -use common_meta::wal::WalConfig; +use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; use common_telemetry::{error, info, warn}; +use common_wal::config::MetaSrvWalConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; @@ -74,7 +74,7 @@ pub struct MetaSrvOptions { pub datanode: DatanodeOptions, pub enable_telemetry: bool, pub data_home: String, - pub wal: WalConfig, + pub wal: MetaSrvWalConfig, pub export_metrics: ExportMetricsOption, pub store_key_prefix: String, } @@ -107,7 +107,7 @@ impl Default for MetaSrvOptions { datanode: DatanodeOptions::default(), enable_telemetry: true, data_home: METASRV_HOME.to_string(), - wal: WalConfig::default(), + wal: MetaSrvWalConfig::default(), export_metrics: ExportMetricsOption::default(), store_key_prefix: String::new(), } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7bd498d927..ed727132ea 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -30,7 +30,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; -use common_meta::wal::WalOptionsAllocator; +use common_meta::wal_options_allocator::WalOptionsAllocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f33511cf0b..78a659876d 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,6 +32,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-test-util = { workspace = true, optional = true } common-time.workspace = true +common-wal.workspace = true dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true @@ -54,7 +55,7 @@ puffin.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true -serde_with = "3" +serde_with.workspace = true smallvec.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2b54fa3650..2306c663cd 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -22,9 +22,9 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; -use common_config::wal::WalOptions; use common_telemetry::info; use common_time::util::current_time_millis; +use common_wal::options::WalOptions; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 9e53f52b8b..e4d52244ef 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -18,9 +18,9 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; -use common_config::wal::WalOptions; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; +use common_wal::options::WalOptions; use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 8fb640399e..e8a7f66319 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -17,8 +17,7 @@ use std::collections::HashMap; use std::time::Duration; -use common_config::wal::WalOptions; -use common_config::WAL_OPTIONS_KEY; +use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use serde::Deserialize; use serde_json::Value; use serde_with::{serde_as, with_prefix, DisplayFromStr}; @@ -174,8 +173,7 @@ fn options_map_to_value(options: &HashMap) -> Value { #[cfg(test)] mod tests { - use common_config::wal::KafkaWalOptions; - use common_config::WAL_OPTIONS_KEY; + use common_wal::options::KafkaWalOptions; use super::*; diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 5cb081d2a5..36b1a0fac6 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,7 +16,7 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry}; -use common_config::wal::WalOptions; +use common_wal::options::WalOptions; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index ac17d3df54..49b6dd4931 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use api::v1::WalEntry; use async_stream::try_stream; -use common_config::wal::WalOptions; use common_error::ext::BoxedError; +use common_wal::options::WalOptions; use futures::stream::BoxStream; use futures::StreamExt; use prost::Message; diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 31b140ef7b..3bc01aa6b5 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -10,12 +10,12 @@ aquamarine.workspace = true async-trait.workspace = true bytes.workspace = true common-base.workspace = true -common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-time.workspace = true +common-wal.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 903de87ee4..574d016263 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -16,8 +16,8 @@ use std::collections::HashMap; -use common_config::wal::WalOptions; use common_error::ext::ErrorExt; +use common_wal::options::WalOptions; use crate::logstore::entry::Entry; pub use crate::logstore::entry::Id as EntryId; diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 7abbc495ac..4b49d0c6a5 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -30,6 +30,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-test-util.workspace = true +common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true dotenv = "0.15" diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 40a98dd040..ff2854a559 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -24,7 +24,6 @@ use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; -use common_config::WalConfig; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -33,10 +32,10 @@ use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::wal::WalConfig as MetaWalConfig; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; +use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; @@ -77,8 +76,8 @@ pub struct GreptimeDbClusterBuilder { store_config: Option, store_providers: Option>, datanodes: Option, - wal_config: WalConfig, - meta_wal_config: MetaWalConfig, + wal_config: DatanodeWalConfig, + meta_wal_config: MetaSrvWalConfig, shared_home_dir: Option>, meta_selector: Option, } @@ -106,8 +105,8 @@ impl GreptimeDbClusterBuilder { store_config: None, store_providers: None, datanodes: None, - wal_config: WalConfig::default(), - meta_wal_config: MetaWalConfig::default(), + wal_config: DatanodeWalConfig::default(), + meta_wal_config: MetaSrvWalConfig::default(), shared_home_dir: None, meta_selector: None, } @@ -132,13 +131,13 @@ impl GreptimeDbClusterBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { + pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { self.wal_config = wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { + pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { self.meta_wal_config = wal_meta; self } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2769ec0916..2d6d00ef2e 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,16 +17,17 @@ use std::sync::Arc; use cmd::options::MixOptions; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::{KvBackendConfig, WalConfig}; +use common_config::KvBackendConfig; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::{WalConfig as MetaWalConfig, WalOptionsAllocator}; +use common_meta::wal_options_allocator::WalOptionsAllocator; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; +use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; @@ -46,8 +47,8 @@ pub struct GreptimeDbStandalone { #[derive(Clone)] pub struct GreptimeDbStandaloneBuilder { instance_name: String, - wal_config: WalConfig, - meta_wal_config: MetaWalConfig, + wal_config: DatanodeWalConfig, + meta_wal_config: MetaSrvWalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -60,8 +61,8 @@ impl GreptimeDbStandaloneBuilder { store_providers: None, plugin: None, default_store: None, - wal_config: WalConfig::default(), - meta_wal_config: MetaWalConfig::default(), + wal_config: DatanodeWalConfig::default(), + meta_wal_config: MetaSrvWalConfig::default(), } } @@ -92,13 +93,13 @@ impl GreptimeDbStandaloneBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self { + pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { self.wal_config = wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { + pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { self.meta_wal_config = wal_meta; self } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 43eae9b475..623ffc27ca 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,13 +21,13 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; -use common_config::WalConfig; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use common_wal::config::DatanodeWalConfig; use datanode::config::{ AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, @@ -302,7 +302,7 @@ pub fn create_tmp_dir_and_datanode_opts( default_store_type: StorageType, store_provider_types: Vec, name: &str, - wal_config: WalConfig, + wal_config: DatanodeWalConfig, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); @@ -336,7 +336,7 @@ pub(crate) fn create_datanode_opts( default_store: ObjectStoreConfig, providers: Vec, home_dir: String, - wal_config: WalConfig, + wal_config: DatanodeWalConfig, ) -> DatanodeOptions { DatanodeOptions { node_id: Some(0), diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 5a2f41316b..0e50bfc061 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -15,14 +15,12 @@ use std::env; use std::sync::Arc; -use common_config::wal::KafkaConfig; -use common_config::WalConfig; -use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig; -use common_meta::wal::WalConfig as MetaWalConfig; use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; use common_test_util::find_workspace_path; +use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; +use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; use frontend::instance::Instance; use rstest_reuse::{self, template}; @@ -163,11 +161,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option>(); let test_name = uuid::Uuid::new_v4().to_string(); let builder = GreptimeDbStandaloneBuilder::new(&test_name) - .with_wal_config(WalConfig::Kafka(KafkaConfig { + .with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { broker_endpoints: endpoints.clone(), ..Default::default() })) - .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + .with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { broker_endpoints: endpoints, topic_name_prefix: test_name.to_string(), num_topics: 3, @@ -193,11 +191,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option